From a8a1fc822415f2e5df19c6f6a6ab6d3b8084e5f3 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Fri, 11 May 2018 14:28:11 +0200 Subject: [PATCH] Futurize utils subpackage --- sdks/python/apache_beam/utils/__init__.py | 2 ++ sdks/python/apache_beam/utils/annotations.py | 2 ++ .../apache_beam/utils/annotations_test.py | 2 ++ sdks/python/apache_beam/utils/counters.py | 4 +++ sdks/python/apache_beam/utils/plugin.py | 4 +++ sdks/python/apache_beam/utils/processes.py | 2 ++ .../apache_beam/utils/processes_test.py | 2 ++ sdks/python/apache_beam/utils/profiler.py | 14 ++++---- sdks/python/apache_beam/utils/proto_utils.py | 2 ++ sdks/python/apache_beam/utils/retry.py | 8 +++-- sdks/python/apache_beam/utils/retry_test.py | 3 ++ sdks/python/apache_beam/utils/timestamp.py | 36 ++++++++++++++----- sdks/python/apache_beam/utils/urns.py | 3 ++ .../apache_beam/utils/windowed_value.pxd | 3 -- .../apache_beam/utils/windowed_value.py | 33 ++++++----------- .../apache_beam/utils/windowed_value_test.py | 2 ++ sdks/python/tox.ini | 1 + 17 files changed, 80 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/utils/__init__.py b/sdks/python/apache_beam/utils/__init__.py index 635c80f7c6b4..5bc12e7e282f 100644 --- a/sdks/python/apache_beam/utils/__init__.py +++ b/sdks/python/apache_beam/utils/__init__.py @@ -19,3 +19,5 @@ For internal use only; no backwards-compatibility guarantees. """ + +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/utils/annotations.py b/sdks/python/apache_beam/utils/annotations.py index 036b08287df1..6f62ce93116f 100644 --- a/sdks/python/apache_beam/utils/annotations.py +++ b/sdks/python/apache_beam/utils/annotations.py @@ -60,6 +60,8 @@ def exp_multiply(arg1, arg2): print exp_multiply(5,6) """ +from __future__ import absolute_import + import warnings from functools import partial from functools import wraps diff --git a/sdks/python/apache_beam/utils/annotations_test.py b/sdks/python/apache_beam/utils/annotations_test.py index ddd1b9ff7571..2901e3b36320 100644 --- a/sdks/python/apache_beam/utils/annotations_test.py +++ b/sdks/python/apache_beam/utils/annotations_test.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import absolute_import + import unittest import warnings diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py index 46ac8ff723a0..5696bc43f807 100644 --- a/sdks/python/apache_beam/utils/counters.py +++ b/sdks/python/apache_beam/utils/counters.py @@ -23,7 +23,11 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + import threading +from builtins import hex +from builtins import object from collections import namedtuple from apache_beam.transforms import cy_combiners diff --git a/sdks/python/apache_beam/utils/plugin.py b/sdks/python/apache_beam/utils/plugin.py index 563b93c54c7d..1425874ed3b6 100644 --- a/sdks/python/apache_beam/utils/plugin.py +++ b/sdks/python/apache_beam/utils/plugin.py @@ -20,6 +20,10 @@ For experimental usage only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + +from builtins import object + class BeamPlugin(object): """Plugin base class to be extended by dependent users such as FileSystem. diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index e5fd9c84aaba..b0e8e3c8ba55 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -20,6 +20,8 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + import platform import subprocess diff --git a/sdks/python/apache_beam/utils/processes_test.py b/sdks/python/apache_beam/utils/processes_test.py index 2dd45f44dc53..123c124adc4f 100644 --- a/sdks/python/apache_beam/utils/processes_test.py +++ b/sdks/python/apache_beam/utils/processes_test.py @@ -16,6 +16,8 @@ # """Unit tests for the processes module.""" +from __future__ import absolute_import + import unittest import mock diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py index 9f9c8cd16296..18a712fff642 100644 --- a/sdks/python/apache_beam/utils/profiler.py +++ b/sdks/python/apache_beam/utils/profiler.py @@ -20,21 +20,19 @@ For internal use only; no backwards-compatibility guarantees. """ -import cProfile +from __future__ import absolute_import + +import cProfile # pylint: disable=bad-python3-import +import io import logging import os import pstats -import sys import tempfile import time import warnings +from builtins import object from threading import Timer -if sys.version_info[0] < 3: - import StringIO -else: - from io import StringIO - class Profile(object): """cProfile wrapper context for saving and logging profiler results.""" @@ -71,7 +69,7 @@ def __exit__(self, *args): os.remove(filename) if self.log_results: - s = StringIO() + s = io.StringIO() self.stats = pstats.Stats( self.profile, stream=s).sort_stats(Profile.SORTBY) self.stats.print_stats() diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py index d7693f3f7839..5dceb174e5ab 100644 --- a/sdks/python/apache_beam/utils/proto_utils.py +++ b/sdks/python/apache_beam/utils/proto_utils.py @@ -17,6 +17,8 @@ """For internal use only; no backwards-compatibility guarantees.""" +from __future__ import absolute_import + from google.protobuf import any_pb2 from google.protobuf import struct_pb2 diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index e0267bd9df3e..9a7c152f690e 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -25,14 +25,18 @@ needed right now use a @retry.no_retries decorator. """ +from __future__ import absolute_import import logging import random import sys import time import traceback +from builtins import next +from builtins import object +from builtins import range -import six +from future.utils import raise_with_traceback from apache_beam.io.filesystem import BeamIOError @@ -190,7 +194,7 @@ def wrapper(*args, **kwargs): sleep_interval = next(retry_intervals) except StopIteration: # Re-raise the original exception since we finished the retries. - six.raise_from(exn, exn_traceback) + raise_with_traceback(exn, exn_traceback) logger( 'Retry with exponential backoff: waiting for %s seconds before ' diff --git a/sdks/python/apache_beam/utils/retry_test.py b/sdks/python/apache_beam/utils/retry_test.py index e5f07e88b420..0217704143b7 100644 --- a/sdks/python/apache_beam/utils/retry_test.py +++ b/sdks/python/apache_beam/utils/retry_test.py @@ -17,7 +17,10 @@ """Unit tests for the retry module.""" +from __future__ import absolute_import + import unittest +from builtins import object from apache_beam.utils import retry diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py index 5bed46ca39f1..e76286176c07 100644 --- a/sdks/python/apache_beam/utils/timestamp.py +++ b/sdks/python/apache_beam/utils/timestamp.py @@ -24,12 +24,19 @@ from __future__ import division import datetime +import functools import re +from builtins import object import pytz -from six import integer_types +try: # Python 2 + long # pylint: disable=long-builtin +except NameError: # Python 3 + long = int + +@functools.total_ordering class Timestamp(object): """Represents a Unix second timestamp with microsecond granularity. @@ -42,10 +49,10 @@ class Timestamp(object): """ def __init__(self, seconds=0, micros=0): - if not isinstance(seconds, integer_types + (float,)): + if not isinstance(seconds, (int, long, float)): raise TypeError('Cannot interpret %s %s as seconds.' % ( seconds, type(seconds))) - if not isinstance(micros, integer_types + (float,)): + if not isinstance(micros, (int, long, float)): raise TypeError('Cannot interpret %s %s as micros.' % ( micros, type(micros))) self.micros = int(seconds * 1000000) + int(micros) @@ -63,7 +70,7 @@ def of(seconds): Corresponding Timestamp object. """ - if not isinstance(seconds, integer_types + (float, Timestamp)): + if not isinstance(seconds, (int, long, float, Timestamp)): raise TypeError('Cannot interpret %s %s as Timestamp.' % ( seconds, type(seconds))) if isinstance(seconds, Timestamp): @@ -143,11 +150,17 @@ def __int__(self): # Note that the returned value may have lost precision. return self.micros // 1000000 - def __cmp__(self, other): + def __eq__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Duration): + other = Timestamp.of(other) + return self.micros == other.micros + + def __lt__(self, other): # Allow comparisons between Duration and Timestamp values. if not isinstance(other, Duration): other = Timestamp.of(other) - return cmp(self.micros, other.micros) + return self.micros < other.micros def __hash__(self): return hash(self.micros) @@ -172,6 +185,7 @@ def __mod__(self, other): MAX_TIMESTAMP = Timestamp(micros=0x7fffffffffffffff) +@functools.total_ordering class Duration(object): """Represents a second duration with microsecond granularity. @@ -221,11 +235,17 @@ def __float__(self): # Note that the returned value may have lost precision. return self.micros / 1000000 - def __cmp__(self, other): + def __eq__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Timestamp): + other = Duration.of(other) + return self.micros == other.micros + + def __lt__(self, other): # Allow comparisons between Duration and Timestamp values. if not isinstance(other, Timestamp): other = Duration.of(other) - return cmp(self.micros, other.micros) + return self.micros < other.micros def __hash__(self): return hash(self.micros) diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index ba3c6f7ae5c4..4e9c357e8e7e 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -17,8 +17,11 @@ """For internal use only; no backwards-compatibility guarantees.""" +from __future__ import absolute_import + import abc import inspect +from builtins import object from google.protobuf import message from google.protobuf import wrappers_pb2 diff --git a/sdks/python/apache_beam/utils/windowed_value.pxd b/sdks/python/apache_beam/utils/windowed_value.pxd index 710f22a87985..8755c939d364 100644 --- a/sdks/python/apache_beam/utils/windowed_value.pxd +++ b/sdks/python/apache_beam/utils/windowed_value.pxd @@ -31,9 +31,6 @@ cdef class WindowedValue(object): cpdef WindowedValue with_value(self, new_value) - @staticmethod - cdef inline bint _typed_eq(WindowedValue left, WindowedValue right) except? -2 - @cython.locals(wv=WindowedValue) cpdef WindowedValue create( object value, int64_t timestamp_micros, object windows, object pane_info=*) diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index 1b3228b2e6ed..04fe94735f8e 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -27,6 +27,10 @@ #cython: profile=True +from __future__ import absolute_import + +from builtins import object + from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.utils.timestamp import Timestamp @@ -178,34 +182,19 @@ def __repr__(self): self.windows, self.pane_info) + def __eq__(self, other): + return (type(self) == type(other) + and self.timestamp_micros == other.timestamp_micros + and self.value == other.value + and self.windows == other.windows + and self.pane_info == other.pane_info) + def __hash__(self): return (hash(self.value) + 3 * self.timestamp_micros + 7 * hash(self.windows) + 11 * hash(self.pane_info)) - # We'd rather implement __eq__, but Cython supports that via __richcmp__ - # instead. Fortunately __cmp__ is understood by both (but not by Python 3). - def __cmp__(left, right): # pylint: disable=no-self-argument - """Compares left and right for equality. - - For performance reasons, doesn't actually impose an ordering - on unequal values (always returning 1). - """ - if type(left) is not type(right): - return cmp(type(left), type(right)) - - # TODO(robertwb): Avoid the type checks? - # Returns False (0) if equal, and True (1) if not. - return not WindowedValue._typed_eq(left, right) - - @staticmethod - def _typed_eq(left, right): - return (left.timestamp_micros == right.timestamp_micros - and left.value == right.value - and left.windows == right.windows - and left.pane_info == right.pane_info) - def with_value(self, new_value): """Creates a new WindowedValue with the same timestamps and windows as this. diff --git a/sdks/python/apache_beam/utils/windowed_value_test.py b/sdks/python/apache_beam/utils/windowed_value_test.py index 8c72c8cff9a0..5549aeee6821 100644 --- a/sdks/python/apache_beam/utils/windowed_value_test.py +++ b/sdks/python/apache_beam/utils/windowed_value_test.py @@ -17,6 +17,8 @@ """Unit tests for the windowed_value.""" +from __future__ import absolute_import + import copy import pickle import unittest diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 3b3343768529..325913fc93e2 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -102,6 +102,7 @@ modules = apache_beam/internal apache_beam/metrics apache_beam/options + apache_beam/utils commands = python --version pip --version