From 16beed905d44a84d0f608078207f528ecd890aaa Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 28 Feb 2018 13:49:41 -0800 Subject: [PATCH 1/9] Change windowed value to not use cmp anymore. Note: this file as marked as performance important so please take a look, but this should be strictly a perf improvement. --- sdks/python/apache_beam/utils/windowed_value.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index 1b3228b2e6ed..a586acf96fc0 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -193,7 +193,7 @@ def __cmp__(left, right): # pylint: disable=no-self-argument on unequal values (always returning 1). """ if type(left) is not type(right): - return cmp(type(left), type(right)) + return 1 # TODO(robertwb): Avoid the type checks? # Returns False (0) if equal, and True (1) if not. From d190bc2d27e467b89ff4e6fa3412e003adf49942 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 28 Feb 2018 13:51:09 -0800 Subject: [PATCH 2/9] Deprecate the comparators since I didn't find an active use for them and it encourages cmp like behaviour. --- sdks/python/apache_beam/io/gcp/datastore/v1/helper.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index 87d798bebe3f..93492a3847ae 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -31,6 +31,7 @@ # pylint: disable=ungrouped-imports from apache_beam.internal.gcp import auth from apache_beam.utils import retry +from apache_beam.utils.annotations import deprecated # Protect against environments where datastore library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -50,6 +51,7 @@ # pylint: enable=ungrouped-imports +@deprecated(since="v2.4") def key_comparator(k1, k2): """A comparator for Datastore keys. @@ -78,6 +80,7 @@ def key_comparator(k1, k2): return 0 +@deprecated(since="v2.4") def compare_path(p1, p2): """A comparator for key path. @@ -90,7 +93,7 @@ def compare_path(p1, p2): 3. If no `id` is defined for both paths, then their `names` are compared. """ - result = cmp(p1.kind, p2.kind) + result = (p1.kind > p2.kind) - (p1.kind < p2.kind) if result != 0: return result @@ -98,12 +101,12 @@ def compare_path(p1, p2): if not p2.HasField('id'): return -1 - return cmp(p1.id, p2.id) + return (p1.id > p2.id) - (p1.id < p2.id) if p2.HasField('id'): return 1 - return cmp(p1.name, p2.name) + return (p1.name > p2.name) - (p1.name < p2.name) def get_datastore(project): From 979e540a96429f4aeef99ea2ef3132b007d9fa8d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 28 Feb 2018 13:52:00 -0800 Subject: [PATCH 3/9] Remove dead _variant_comparator test code which used cmp since it doesn't seem to be used. --- sdks/python/apache_beam/io/vcfio_test.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sdks/python/apache_beam/io/vcfio_test.py b/sdks/python/apache_beam/io/vcfio_test.py index 029515fe3419..a750dc747111 100644 --- a/sdks/python/apache_beam/io/vcfio_test.py +++ b/sdks/python/apache_beam/io/vcfio_test.py @@ -70,15 +70,6 @@ def get_full_dir(): return os.path.join(os.path.dirname(__file__), '..', 'testing', 'data', 'vcf') -# Helper method for comparing variants. -def _variant_comparator(v1, v2): - if v1.reference_name == v2.reference_name: - if v1.start == v2.start: - return cmp(v1.end, v2.end) - return cmp(v1.start, v2.start) - return cmp(v1.reference_name, v2.reference_name) - - # Helper method for verifying equal count on PCollection. def _count_equals_to(expected_count): def _count_equal(actual_list): From 14b74b1510c01490ce9be8d96ae4a9b1edd89ce3 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 28 Feb 2018 13:52:53 -0800 Subject: [PATCH 4/9] Rewrite cmp methods to use eq/ne/lt and total_ordering --- .../python/apache_beam/testing/test_stream.py | 53 +++++++++++++------ sdks/python/apache_beam/transforms/window.py | 34 ++++++++---- sdks/python/apache_beam/utils/timestamp.py | 42 ++++++++++++--- 3 files changed, 96 insertions(+), 33 deletions(-) diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py index 8a63e7bd0561..db4cc66b1d0b 100644 --- a/sdks/python/apache_beam/testing/test_stream.py +++ b/sdks/python/apache_beam/testing/test_stream.py @@ -21,7 +21,7 @@ """ from abc import ABCMeta -from abc import abstractmethod +from functools import total_ordering from apache_beam import coders from apache_beam import core @@ -46,44 +46,65 @@ class Event(object): __metaclass__ = ABCMeta - def __cmp__(self, other): - if type(self) is not type(other): - return cmp(type(self), type(other)) - return self._typed_cmp(other) - - @abstractmethod - def _typed_cmp(self, other): - raise NotImplementedError - +@total_ordering class ElementEvent(Event): """Element-producing test stream event.""" def __init__(self, timestamped_values): self.timestamped_values = timestamped_values - def _typed_cmp(self, other): - return cmp(self.timestamped_values, other.timestamped_values) + def __eq__(self, other): + return (type(self) is type(other) and + self.timestamped_values == other.timestamped_values) + + def __ne__(self, other): + return not self.__eq__(other) + + def __lt__(self, other): + if type(self) is not type(other): + return type(self) < type(other) + return self.timestamped_values < other.timestamped_values +@total_ordering class WatermarkEvent(Event): """Watermark-advancing test stream event.""" def __init__(self, new_watermark): self.new_watermark = timestamp.Timestamp.of(new_watermark) - def _typed_cmp(self, other): - return cmp(self.new_watermark, other.new_watermark) + def __eq__(self, other): + return (type(self) is type(other) and + self.new_watermark == other.new_watermark) + def __ne__(self, other): + return not self.__eq__(other) + def __lt__(self, other): + if type(self) is not type(other): + return type(self) < type(other) + return self.new_watermark < other.new_watermark + + +@total_ordering class ProcessingTimeEvent(Event): """Processing time-advancing test stream event.""" def __init__(self, advance_by): self.advance_by = timestamp.Duration.of(advance_by) - def _typed_cmp(self, other): - return cmp(self.advance_by, other.advance_by) + def __eq__(self, other): + return (type(self) is type(other) and + self.advance_by == other.advance_by) + + def __ne__(self, other): + return not self.__eq__(other) + + def __lt__(self, other): + if type(self) is not type(other): + return type(self) < type(other) + return self.advance_by < other.advance_by class TestStream(PTransform): diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index c250e8c6d365..d14d7ae23ce2 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -50,6 +50,7 @@ from __future__ import absolute_import import abc +from functools import total_ordering from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 @@ -177,6 +178,7 @@ def get_transformed_output_time(self, window, input_timestamp): # pylint: disab urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_WINDOWFN) +@total_ordering class BoundedWindow(object): """A window for timestamps in range (-infinity, end). @@ -190,12 +192,17 @@ def __init__(self, end): def max_timestamp(self): return self.end.predecessor() - def __cmp__(self, other): - # Order first by endpoint, then arbitrarily. - return cmp(self.end, other.end) or cmp(hash(self), hash(other)) - def __eq__(self, other): - raise NotImplementedError + return type(self) is type(other) and self.end == other.end + + def __ne__(self, other): + return not self.__eq__(other) + + def __lt__(self, other): + if type(self) is not type(other): + return type(self) < type(other) + # Order first by endpoint, then arbitrarily. + return (self.end, hash(self)) < (other.end, hash(other)) def __hash__(self): return hash(self.end) @@ -220,7 +227,9 @@ def __hash__(self): return hash((self.start, self.end)) def __eq__(self, other): - return self.start == other.start and self.end == other.end + return type(self) == type(other) and \ + self.start == other.start and \ + self.end == other.end def __repr__(self): return '[%s, %s)' % (float(self.start), float(self.end)) @@ -233,6 +242,7 @@ def union(self, other): min(self.start, other.start), max(self.end, other.end)) +@total_ordering class TimestampedValue(object): """A timestamped value having a value and a timestamp. @@ -245,10 +255,16 @@ def __init__(self, value, timestamp): self.value = value self.timestamp = Timestamp.of(timestamp) - def __cmp__(self, other): + def __eq__(self, other): + return self.value == other.value and self.timestamp == other.timestamp + + def __hash_(self): + return hash(self.value) ^ hash(self.timestamp) << 1 + + def __lt__(self, other): if type(self) is not type(other): - return cmp(type(self), type(other)) - return cmp((self.value, self.timestamp), (other.value, other.timestamp)) + return type(self) < type(other) + return (self.value, self.timestamp) < (other.value, other.timestamp) class GlobalWindow(BoundedWindow): diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py index 7c41c3002f6d..91d018ffde94 100644 --- a/sdks/python/apache_beam/utils/timestamp.py +++ b/sdks/python/apache_beam/utils/timestamp.py @@ -20,15 +20,16 @@ For internal use only; no backwards-compatibility guarantees. """ -from __future__ import absolute_import -from __future__ import division +from __future__ import absolute_import, division import datetime import re +from functools import total_ordering import pytz +@total_ordering class Timestamp(object): """Represents a Unix second timestamp with microsecond granularity. @@ -142,11 +143,23 @@ def __int__(self): # Note that the returned value may have lost precision. return self.micros // 1000000 - def __cmp__(self, other): + def __eq__(self, other): # Allow comparisons between Duration and Timestamp values. - if not isinstance(other, Duration): + if not isinstance(other, (Timestamp, Duration)): other = Timestamp.of(other) - return cmp(self.micros, other.micros) + return self.micros == other.micros + + def __ne__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, (Timestamp, Duration)): + other = Timestamp.of(other) + return self.micros != other.micros + + def __lt__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, (Timestamp, Duration)): + other = Timestamp.of(other) + return self.micros < other.micros def __hash__(self): return hash(self.micros) @@ -171,6 +184,7 @@ def __mod__(self, other): MAX_TIMESTAMP = Timestamp(micros=0x7fffffffffffffff) +@total_ordering class Duration(object): """Represents a second duration with microsecond granularity. @@ -220,11 +234,23 @@ def __float__(self): # Note that the returned value may have lost precision. return self.micros / 1000000 - def __cmp__(self, other): + def __eq__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, (Timestamp, Duration)): + other = Duration.of(other) + return self.micros == other.micros + + def __ne__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, (Timestamp, Duration)): + other = Duration.of(other) + return self.micros != other.micros + + def __lt__(self, other): # Allow comparisons between Duration and Timestamp values. - if not isinstance(other, Timestamp): + if not isinstance(other, (Timestamp, Duration)): other = Duration.of(other) - return cmp(self.micros, other.micros) + return self.micros < other.micros def __hash__(self): return hash(self.micros) From d1419e97cd274ec30670dc36cacdc40c9fb5fcd2 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 1 Mar 2018 18:26:35 -0800 Subject: [PATCH 5/9] No longer compare windows of different types. --- sdks/python/apache_beam/transforms/window.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index d14d7ae23ce2..7a25c840a3a7 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -200,13 +200,10 @@ def __ne__(self, other): def __lt__(self, other): if type(self) is not type(other): - return type(self) < type(other) + raise TypeError("Can not compare type %s and %s" % (type(self), type(other))) # Order first by endpoint, then arbitrarily. return (self.end, hash(self)) < (other.end, hash(other)) - def __hash__(self): - return hash(self.end) - def __repr__(self): return '[?, %s)' % float(self.end) @@ -263,7 +260,7 @@ def __hash_(self): def __lt__(self, other): if type(self) is not type(other): - return type(self) < type(other) + raise TypeError("Can not compare type %s and %s" % (type(self), type(other))) return (self.value, self.timestamp) < (other.value, other.timestamp) From aeb66a6afacf3866ca295c990d4b948c93413f2d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 1 Mar 2018 18:27:18 -0800 Subject: [PATCH 6/9] Style --- sdks/python/apache_beam/transforms/window.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 7a25c840a3a7..5edb499bc0d4 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -200,7 +200,8 @@ def __ne__(self, other): def __lt__(self, other): if type(self) is not type(other): - raise TypeError("Can not compare type %s and %s" % (type(self), type(other))) + raise TypeError("Can not compare type %s and %s" % + (type(self), type(other))) # Order first by endpoint, then arbitrarily. return (self.end, hash(self)) < (other.end, hash(other)) @@ -260,7 +261,8 @@ def __hash_(self): def __lt__(self, other): if type(self) is not type(other): - raise TypeError("Can not compare type %s and %s" % (type(self), type(other))) + raise TypeError("Can not compare type %s and %s" % + (type(self), type(other))) return (self.value, self.timestamp) < (other.value, other.timestamp) From 2880bb242571bb219bb753c2726fb2395a1c7685 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 1 Mar 2018 21:16:17 -0800 Subject: [PATCH 7/9] Remove __hash_ function. --- sdks/python/apache_beam/transforms/window.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 5edb499bc0d4..c35d72530ace 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -256,9 +256,6 @@ def __init__(self, value, timestamp): def __eq__(self, other): return self.value == other.value and self.timestamp == other.timestamp - def __hash_(self): - return hash(self.value) ^ hash(self.timestamp) << 1 - def __lt__(self, other): if type(self) is not type(other): raise TypeError("Can not compare type %s and %s" % From 0bdca44634d7f377da258f9eab0b7ee1957c3a4f Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 7 Mar 2018 10:15:21 -0800 Subject: [PATCH 8/9] Switch \ to ( for multi-line --- sdks/python/apache_beam/transforms/window.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index c35d72530ace..2a0c5236d93a 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -225,9 +225,9 @@ def __hash__(self): return hash((self.start, self.end)) def __eq__(self, other): - return type(self) == type(other) and \ - self.start == other.start and \ - self.end == other.end + return (type(self) == type(other) and + self.start == other.start and + self.end == other.end) def __repr__(self): return '[%s, %s)' % (float(self.start), float(self.end)) From cd98dae297dd0cbf82ca4189972f6b6a3be6ad35 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 14 Mar 2018 12:02:40 -0700 Subject: [PATCH 9/9] Remove deprecations from the helper. --- sdks/python/apache_beam/io/gcp/datastore/v1/helper.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index 93492a3847ae..7a58fe3e2125 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -31,7 +31,6 @@ # pylint: disable=ungrouped-imports from apache_beam.internal.gcp import auth from apache_beam.utils import retry -from apache_beam.utils.annotations import deprecated # Protect against environments where datastore library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -51,7 +50,6 @@ # pylint: enable=ungrouped-imports -@deprecated(since="v2.4") def key_comparator(k1, k2): """A comparator for Datastore keys. @@ -80,7 +78,6 @@ def key_comparator(k1, k2): return 0 -@deprecated(since="v2.4") def compare_path(p1, p2): """A comparator for key path.