Skip to content
Closed
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,20 @@ def compare_path(p1, p2):
3. If no `id` is defined for both paths, then their `names` are compared.
"""

result = cmp(p1.kind, p2.kind)
result = (p1.kind > p2.kind) - (p1.kind < p2.kind)
if result != 0:
return result

if p1.HasField('id'):
if not p2.HasField('id'):
return -1

return cmp(p1.id, p2.id)
return (p1.id > p2.id) - (p1.id < p2.id)

if p2.HasField('id'):
return 1

return cmp(p1.name, p2.name)
return (p1.name > p2.name) - (p1.name < p2.name)


def get_datastore(project):
Expand Down
9 changes: 0 additions & 9 deletions sdks/python/apache_beam/io/vcfio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@ def get_full_dir():
return os.path.join(os.path.dirname(__file__), '..', 'testing', 'data', 'vcf')


# Helper method for comparing variants.
def _variant_comparator(v1, v2):
if v1.reference_name == v2.reference_name:
if v1.start == v2.start:
return cmp(v1.end, v2.end)
return cmp(v1.start, v2.start)
return cmp(v1.reference_name, v2.reference_name)


# Helper method for verifying equal count on PCollection.
def _count_equals_to(expected_count):
def _count_equal(actual_list):
Expand Down
53 changes: 37 additions & 16 deletions sdks/python/apache_beam/testing/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"""

from abc import ABCMeta
from abc import abstractmethod
from functools import total_ordering
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charlesccychen

Where do we compare these events?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping?


from apache_beam import coders
from apache_beam import core
Expand All @@ -46,44 +46,65 @@ class Event(object):

__metaclass__ = ABCMeta

def __cmp__(self, other):
if type(self) is not type(other):
return cmp(type(self), type(other))
return self._typed_cmp(other)

@abstractmethod
def _typed_cmp(self, other):
raise NotImplementedError


@total_ordering
class ElementEvent(Event):
"""Element-producing test stream event."""

def __init__(self, timestamped_values):
self.timestamped_values = timestamped_values

def _typed_cmp(self, other):
return cmp(self.timestamped_values, other.timestamped_values)
def __eq__(self, other):
return (type(self) is type(other) and
self.timestamped_values == other.timestamped_values)

def __ne__(self, other):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to implement ne, is not this the default implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python3 we are fine but in Python 2.7, ne isn't inherited from object. If you see the docs for total_ordering. See the notes in https://bugs.python.org/issue25732

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

return not self.__eq__(other)

def __lt__(self, other):
if type(self) is not type(other):
return type(self) < type(other)
return self.timestamped_values < other.timestamped_values


@total_ordering
class WatermarkEvent(Event):
"""Watermark-advancing test stream event."""

def __init__(self, new_watermark):
self.new_watermark = timestamp.Timestamp.of(new_watermark)

def _typed_cmp(self, other):
return cmp(self.new_watermark, other.new_watermark)
def __eq__(self, other):
return (type(self) is type(other) and
self.new_watermark == other.new_watermark)

def __ne__(self, other):
return not self.__eq__(other)

def __lt__(self, other):
if type(self) is not type(other):
return type(self) < type(other)
return self.new_watermark < other.new_watermark


@total_ordering
class ProcessingTimeEvent(Event):
"""Processing time-advancing test stream event."""

def __init__(self, advance_by):
self.advance_by = timestamp.Duration.of(advance_by)

def _typed_cmp(self, other):
return cmp(self.advance_by, other.advance_by)
def __eq__(self, other):
return (type(self) is type(other) and
self.advance_by == other.advance_by)

def __ne__(self, other):
return not self.__eq__(other)

def __lt__(self, other):
if type(self) is not type(other):
return type(self) < type(other)
return self.advance_by < other.advance_by


class TestStream(PTransform):
Expand Down
34 changes: 23 additions & 11 deletions sdks/python/apache_beam/transforms/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from __future__ import absolute_import

import abc
from functools import total_ordering

from google.protobuf import duration_pb2
from google.protobuf import timestamp_pb2
Expand Down Expand Up @@ -177,6 +178,7 @@ def get_transformed_output_time(self, window, input_timestamp): # pylint: disab
urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_WINDOWFN)


@total_ordering
class BoundedWindow(object):
"""A window for timestamps in range (-infinity, end).

Expand All @@ -190,15 +192,18 @@ def __init__(self, end):
def max_timestamp(self):
return self.end.predecessor()

def __cmp__(self, other):
# Order first by endpoint, then arbitrarily.
return cmp(self.end, other.end) or cmp(hash(self), hash(other))

def __eq__(self, other):
raise NotImplementedError
return type(self) is type(other) and self.end == other.end

def __hash__(self):
return hash(self.end)
def __ne__(self, other):
return not self.__eq__(other)

def __lt__(self, other):
if type(self) is not type(other):
raise TypeError("Can not compare type %s and %s" %
(type(self), type(other)))
# Order first by endpoint, then arbitrarily.
return (self.end, hash(self)) < (other.end, hash(other))

def __repr__(self):
return '[?, %s)' % float(self.end)
Expand All @@ -220,7 +225,9 @@ def __hash__(self):
return hash((self.start, self.end))

def __eq__(self, other):
return self.start == other.start and self.end == other.end
return (type(self) == type(other) and
self.start == other.start and
self.end == other.end)

def __repr__(self):
return '[%s, %s)' % (float(self.start), float(self.end))
Expand All @@ -233,6 +240,7 @@ def union(self, other):
min(self.start, other.start), max(self.end, other.end))


@total_ordering
class TimestampedValue(object):
"""A timestamped value having a value and a timestamp.

Expand All @@ -245,10 +253,14 @@ def __init__(self, value, timestamp):
self.value = value
self.timestamp = Timestamp.of(timestamp)

def __cmp__(self, other):
def __eq__(self, other):
return self.value == other.value and self.timestamp == other.timestamp

def __lt__(self, other):
if type(self) is not type(other):
return cmp(type(self), type(other))
return cmp((self.value, self.timestamp), (other.value, other.timestamp))
raise TypeError("Can not compare type %s and %s" %
(type(self), type(other)))
return (self.value, self.timestamp) < (other.value, other.timestamp)


class GlobalWindow(BoundedWindow):
Expand Down
42 changes: 34 additions & 8 deletions sdks/python/apache_beam/utils/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@
For internal use only; no backwards-compatibility guarantees.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import absolute_import, division

import datetime
import re
from functools import total_ordering

import pytz


@total_ordering
class Timestamp(object):
"""Represents a Unix second timestamp with microsecond granularity.

Expand Down Expand Up @@ -142,11 +143,23 @@ def __int__(self):
# Note that the returned value may have lost precision.
return self.micros // 1000000

def __cmp__(self, other):
def __eq__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Duration):
if not isinstance(other, (Timestamp, Duration)):
other = Timestamp.of(other)
return cmp(self.micros, other.micros)
return self.micros == other.micros

def __ne__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, (Timestamp, Duration)):
other = Timestamp.of(other)
return self.micros != other.micros

def __lt__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, (Timestamp, Duration)):
other = Timestamp.of(other)
return self.micros < other.micros

def __hash__(self):
return hash(self.micros)
Expand All @@ -171,6 +184,7 @@ def __mod__(self, other):
MAX_TIMESTAMP = Timestamp(micros=0x7fffffffffffffff)


@total_ordering
class Duration(object):
"""Represents a second duration with microsecond granularity.

Expand Down Expand Up @@ -220,11 +234,23 @@ def __float__(self):
# Note that the returned value may have lost precision.
return self.micros / 1000000

def __cmp__(self, other):
def __eq__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, (Timestamp, Duration)):
other = Duration.of(other)
return self.micros == other.micros

def __ne__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, (Timestamp, Duration)):
other = Duration.of(other)
return self.micros != other.micros

def __lt__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Timestamp):
if not isinstance(other, (Timestamp, Duration)):
other = Duration.of(other)
return cmp(self.micros, other.micros)
return self.micros < other.micros

def __hash__(self):
return hash(self.micros)
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/utils/windowed_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def __cmp__(left, right): # pylint: disable=no-self-argument
on unequal values (always returning 1).
"""
if type(left) is not type(right):
return cmp(type(left), type(right))
return 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in behaviour even though it matches the comment above. What does this affect?

Copy link
Contributor Author

@holdenk holdenk Mar 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the tests pass. If you were to sort a windowed value with something which was not a windowed value the ordering could change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

cc: @robertwb since this pertains to the todo below.


# TODO(robertwb): Avoid the type checks?
# Returns False (0) if equal, and True (1) if not.
Expand Down