diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index 87d798bebe3f..7a58fe3e2125 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -90,7 +90,7 @@ def compare_path(p1, p2): 3. If no `id` is defined for both paths, then their `names` are compared. """ - result = cmp(p1.kind, p2.kind) + result = (p1.kind > p2.kind) - (p1.kind < p2.kind) if result != 0: return result @@ -98,12 +98,12 @@ def compare_path(p1, p2): if not p2.HasField('id'): return -1 - return cmp(p1.id, p2.id) + return (p1.id > p2.id) - (p1.id < p2.id) if p2.HasField('id'): return 1 - return cmp(p1.name, p2.name) + return (p1.name > p2.name) - (p1.name < p2.name) def get_datastore(project): diff --git a/sdks/python/apache_beam/io/vcfio_test.py b/sdks/python/apache_beam/io/vcfio_test.py index 029515fe3419..a750dc747111 100644 --- a/sdks/python/apache_beam/io/vcfio_test.py +++ b/sdks/python/apache_beam/io/vcfio_test.py @@ -70,15 +70,6 @@ def get_full_dir(): return os.path.join(os.path.dirname(__file__), '..', 'testing', 'data', 'vcf') -# Helper method for comparing variants. -def _variant_comparator(v1, v2): - if v1.reference_name == v2.reference_name: - if v1.start == v2.start: - return cmp(v1.end, v2.end) - return cmp(v1.start, v2.start) - return cmp(v1.reference_name, v2.reference_name) - - # Helper method for verifying equal count on PCollection. def _count_equals_to(expected_count): def _count_equal(actual_list): diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py index 8a63e7bd0561..db4cc66b1d0b 100644 --- a/sdks/python/apache_beam/testing/test_stream.py +++ b/sdks/python/apache_beam/testing/test_stream.py @@ -21,7 +21,7 @@ """ from abc import ABCMeta -from abc import abstractmethod +from functools import total_ordering from apache_beam import coders from apache_beam import core @@ -46,44 +46,65 @@ class Event(object): __metaclass__ = ABCMeta - def __cmp__(self, other): - if type(self) is not type(other): - return cmp(type(self), type(other)) - return self._typed_cmp(other) - - @abstractmethod - def _typed_cmp(self, other): - raise NotImplementedError - +@total_ordering class ElementEvent(Event): """Element-producing test stream event.""" def __init__(self, timestamped_values): self.timestamped_values = timestamped_values - def _typed_cmp(self, other): - return cmp(self.timestamped_values, other.timestamped_values) + def __eq__(self, other): + return (type(self) is type(other) and + self.timestamped_values == other.timestamped_values) + + def __ne__(self, other): + return not self.__eq__(other) + + def __lt__(self, other): + if type(self) is not type(other): + return type(self) < type(other) + return self.timestamped_values < other.timestamped_values +@total_ordering class WatermarkEvent(Event): """Watermark-advancing test stream event.""" def __init__(self, new_watermark): self.new_watermark = timestamp.Timestamp.of(new_watermark) - def _typed_cmp(self, other): - return cmp(self.new_watermark, other.new_watermark) + def __eq__(self, other): + return (type(self) is type(other) and + self.new_watermark == other.new_watermark) + def __ne__(self, other): + return not self.__eq__(other) + def __lt__(self, other): + if type(self) is not type(other): + return type(self) < type(other) + return self.new_watermark < other.new_watermark + + +@total_ordering class ProcessingTimeEvent(Event): """Processing time-advancing test stream event.""" def __init__(self, advance_by): self.advance_by = timestamp.Duration.of(advance_by) - def _typed_cmp(self, other): - return cmp(self.advance_by, other.advance_by) + def __eq__(self, other): + return (type(self) is type(other) and + self.advance_by == other.advance_by) + + def __ne__(self, other): + return not self.__eq__(other) + + def __lt__(self, other): + if type(self) is not type(other): + return type(self) < type(other) + return self.advance_by < other.advance_by class TestStream(PTransform): diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index c250e8c6d365..2a0c5236d93a 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -50,6 +50,7 @@ from __future__ import absolute_import import abc +from functools import total_ordering from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 @@ -177,6 +178,7 @@ def get_transformed_output_time(self, window, input_timestamp): # pylint: disab urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_WINDOWFN) +@total_ordering class BoundedWindow(object): """A window for timestamps in range (-infinity, end). @@ -190,15 +192,18 @@ def __init__(self, end): def max_timestamp(self): return self.end.predecessor() - def __cmp__(self, other): - # Order first by endpoint, then arbitrarily. - return cmp(self.end, other.end) or cmp(hash(self), hash(other)) - def __eq__(self, other): - raise NotImplementedError + return type(self) is type(other) and self.end == other.end - def __hash__(self): - return hash(self.end) + def __ne__(self, other): + return not self.__eq__(other) + + def __lt__(self, other): + if type(self) is not type(other): + raise TypeError("Can not compare type %s and %s" % + (type(self), type(other))) + # Order first by endpoint, then arbitrarily. + return (self.end, hash(self)) < (other.end, hash(other)) def __repr__(self): return '[?, %s)' % float(self.end) @@ -220,7 +225,9 @@ def __hash__(self): return hash((self.start, self.end)) def __eq__(self, other): - return self.start == other.start and self.end == other.end + return (type(self) == type(other) and + self.start == other.start and + self.end == other.end) def __repr__(self): return '[%s, %s)' % (float(self.start), float(self.end)) @@ -233,6 +240,7 @@ def union(self, other): min(self.start, other.start), max(self.end, other.end)) +@total_ordering class TimestampedValue(object): """A timestamped value having a value and a timestamp. @@ -245,10 +253,14 @@ def __init__(self, value, timestamp): self.value = value self.timestamp = Timestamp.of(timestamp) - def __cmp__(self, other): + def __eq__(self, other): + return self.value == other.value and self.timestamp == other.timestamp + + def __lt__(self, other): if type(self) is not type(other): - return cmp(type(self), type(other)) - return cmp((self.value, self.timestamp), (other.value, other.timestamp)) + raise TypeError("Can not compare type %s and %s" % + (type(self), type(other))) + return (self.value, self.timestamp) < (other.value, other.timestamp) class GlobalWindow(BoundedWindow): diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py index 7c41c3002f6d..91d018ffde94 100644 --- a/sdks/python/apache_beam/utils/timestamp.py +++ b/sdks/python/apache_beam/utils/timestamp.py @@ -20,15 +20,16 @@ For internal use only; no backwards-compatibility guarantees. """ -from __future__ import absolute_import -from __future__ import division +from __future__ import absolute_import, division import datetime import re +from functools import total_ordering import pytz +@total_ordering class Timestamp(object): """Represents a Unix second timestamp with microsecond granularity. @@ -142,11 +143,23 @@ def __int__(self): # Note that the returned value may have lost precision. return self.micros // 1000000 - def __cmp__(self, other): + def __eq__(self, other): # Allow comparisons between Duration and Timestamp values. - if not isinstance(other, Duration): + if not isinstance(other, (Timestamp, Duration)): other = Timestamp.of(other) - return cmp(self.micros, other.micros) + return self.micros == other.micros + + def __ne__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, (Timestamp, Duration)): + other = Timestamp.of(other) + return self.micros != other.micros + + def __lt__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, (Timestamp, Duration)): + other = Timestamp.of(other) + return self.micros < other.micros def __hash__(self): return hash(self.micros) @@ -171,6 +184,7 @@ def __mod__(self, other): MAX_TIMESTAMP = Timestamp(micros=0x7fffffffffffffff) +@total_ordering class Duration(object): """Represents a second duration with microsecond granularity. @@ -220,11 +234,23 @@ def __float__(self): # Note that the returned value may have lost precision. return self.micros / 1000000 - def __cmp__(self, other): + def __eq__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, (Timestamp, Duration)): + other = Duration.of(other) + return self.micros == other.micros + + def __ne__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, (Timestamp, Duration)): + other = Duration.of(other) + return self.micros != other.micros + + def __lt__(self, other): # Allow comparisons between Duration and Timestamp values. - if not isinstance(other, Timestamp): + if not isinstance(other, (Timestamp, Duration)): other = Duration.of(other) - return cmp(self.micros, other.micros) + return self.micros < other.micros def __hash__(self): return hash(self.micros) diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index 1b3228b2e6ed..a586acf96fc0 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -193,7 +193,7 @@ def __cmp__(left, right): # pylint: disable=no-self-argument on unequal values (always returning 1). """ if type(left) is not type(right): - return cmp(type(left), type(right)) + return 1 # TODO(robertwb): Avoid the type checks? # Returns False (0) if equal, and True (1) if not.