Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@

For internal use only; no backwards-compatibility guarantees.
"""

from __future__ import absolute_import
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/utils/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def exp_multiply(arg1, arg2):
print exp_multiply(5,6)
"""

from __future__ import absolute_import

import warnings
from functools import partial
from functools import wraps
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/utils/annotations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.
#

from __future__ import absolute_import

import unittest
import warnings

Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/utils/counters.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
For internal use only; no backwards-compatibility guarantees.
"""

from __future__ import absolute_import

import threading
from builtins import hex
from builtins import object
from collections import namedtuple

from apache_beam.transforms import cy_combiners
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/utils/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
For experimental usage only; no backwards-compatibility guarantees.
"""

from __future__ import absolute_import

from builtins import object


class BeamPlugin(object):
"""Plugin base class to be extended by dependent users such as FileSystem.
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/utils/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
For internal use only; no backwards-compatibility guarantees.
"""

from __future__ import absolute_import

import platform
import subprocess

Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/utils/processes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#
"""Unit tests for the processes module."""

from __future__ import absolute_import

import unittest

import mock
Expand Down
14 changes: 6 additions & 8 deletions sdks/python/apache_beam/utils/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@
For internal use only; no backwards-compatibility guarantees.
"""

import cProfile
from __future__ import absolute_import

import cProfile # pylint: disable=bad-python3-import
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my education, why does py3 lint complain here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug in pylint.
See pylint-dev/pylint#1612

import io
import logging
import os
import pstats
import sys
import tempfile
import time
import warnings
from builtins import object
from threading import Timer

if sys.version_info[0] < 3:
import StringIO
else:
from io import StringIO


class Profile(object):
"""cProfile wrapper context for saving and logging profiler results."""
Expand Down Expand Up @@ -71,7 +69,7 @@ def __exit__(self, *args):
os.remove(filename)

if self.log_results:
s = StringIO()
s = io.StringIO()
self.stats = pstats.Stats(
self.profile, stream=s).sort_stats(Profile.SORTBY)
self.stats.print_stats()
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/utils/proto_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""For internal use only; no backwards-compatibility guarantees."""

from __future__ import absolute_import

from google.protobuf import any_pb2
from google.protobuf import struct_pb2

Expand Down
8 changes: 6 additions & 2 deletions sdks/python/apache_beam/utils/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@
needed right now use a @retry.no_retries decorator.
"""

from __future__ import absolute_import

import logging
import random
import sys
import time
import traceback
from builtins import next
from builtins import object
from builtins import range

import six
from future.utils import raise_with_traceback

from apache_beam.io.filesystem import BeamIOError

Expand Down Expand Up @@ -190,7 +194,7 @@ def wrapper(*args, **kwargs):
sleep_interval = next(retry_intervals)
except StopIteration:
# Re-raise the original exception since we finished the retries.
six.raise_from(exn, exn_traceback)
raise_with_traceback(exn, exn_traceback)

logger(
'Retry with exponential backoff: waiting for %s seconds before '
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/utils/retry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

"""Unit tests for the retry module."""

from __future__ import absolute_import

import unittest
from builtins import object

from apache_beam.utils import retry

Expand Down
36 changes: 28 additions & 8 deletions sdks/python/apache_beam/utils/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@
from __future__ import division

import datetime
import functools
import re
from builtins import object

import pytz
from six import integer_types

try: # Python 2
long # pylint: disable=long-builtin
except NameError: # Python 3
long = int


@functools.total_ordering
class Timestamp(object):
"""Represents a Unix second timestamp with microsecond granularity.

Expand All @@ -42,10 +49,10 @@ class Timestamp(object):
"""

def __init__(self, seconds=0, micros=0):
if not isinstance(seconds, integer_types + (float,)):
if not isinstance(seconds, (int, long, float)):
raise TypeError('Cannot interpret %s %s as seconds.' % (
seconds, type(seconds)))
if not isinstance(micros, integer_types + (float,)):
if not isinstance(micros, (int, long, float)):
raise TypeError('Cannot interpret %s %s as micros.' % (
micros, type(micros)))
self.micros = int(seconds * 1000000) + int(micros)
Expand All @@ -63,7 +70,7 @@ def of(seconds):
Corresponding Timestamp object.
"""

if not isinstance(seconds, integer_types + (float, Timestamp)):
if not isinstance(seconds, (int, long, float, Timestamp)):
raise TypeError('Cannot interpret %s %s as Timestamp.' % (
seconds, type(seconds)))
if isinstance(seconds, Timestamp):
Expand Down Expand Up @@ -143,11 +150,17 @@ def __int__(self):
# Note that the returned value may have lost precision.
return self.micros // 1000000

def __cmp__(self, other):
def __eq__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Duration):
other = Timestamp.of(other)
return self.micros == other.micros

def __lt__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Duration):
other = Timestamp.of(other)
return cmp(self.micros, other.micros)
return self.micros < other.micros

def __hash__(self):
return hash(self.micros)
Expand All @@ -172,6 +185,7 @@ def __mod__(self, other):
MAX_TIMESTAMP = Timestamp(micros=0x7fffffffffffffff)


@functools.total_ordering
class Duration(object):
"""Represents a second duration with microsecond granularity.

Expand Down Expand Up @@ -221,11 +235,17 @@ def __float__(self):
# Note that the returned value may have lost precision.
return self.micros / 1000000

def __cmp__(self, other):
def __eq__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Timestamp):
other = Duration.of(other)
return self.micros == other.micros

def __lt__(self, other):
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Timestamp):
other = Duration.of(other)
return cmp(self.micros, other.micros)
return self.micros < other.micros

def __hash__(self):
return hash(self.micros)
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/utils/urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

"""For internal use only; no backwards-compatibility guarantees."""

from __future__ import absolute_import

import abc
import inspect
from builtins import object

from google.protobuf import message
from google.protobuf import wrappers_pb2
Expand Down
3 changes: 0 additions & 3 deletions sdks/python/apache_beam/utils/windowed_value.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ cdef class WindowedValue(object):

cpdef WindowedValue with_value(self, new_value)

@staticmethod
cdef inline bint _typed_eq(WindowedValue left, WindowedValue right) except? -2

@cython.locals(wv=WindowedValue)
cpdef WindowedValue create(
object value, int64_t timestamp_micros, object windows, object pane_info=*)
33 changes: 11 additions & 22 deletions sdks/python/apache_beam/utils/windowed_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

#cython: profile=True

from __future__ import absolute_import

from builtins import object

from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import Timestamp
Expand Down Expand Up @@ -178,34 +182,19 @@ def __repr__(self):
self.windows,
self.pane_info)

def __eq__(self, other):
return (type(self) == type(other)
and self.timestamp_micros == other.timestamp_micros
and self.value == other.value
and self.windows == other.windows
and self.pane_info == other.pane_info)

def __hash__(self):
return (hash(self.value) +
3 * self.timestamp_micros +
7 * hash(self.windows) +
11 * hash(self.pane_info))

# We'd rather implement __eq__, but Cython supports that via __richcmp__
# instead. Fortunately __cmp__ is understood by both (but not by Python 3).
def __cmp__(left, right): # pylint: disable=no-self-argument
"""Compares left and right for equality.

For performance reasons, doesn't actually impose an ordering
on unequal values (always returning 1).
"""
if type(left) is not type(right):
return cmp(type(left), type(right))

# TODO(robertwb): Avoid the type checks?
# Returns False (0) if equal, and True (1) if not.
return not WindowedValue._typed_eq(left, right)

@staticmethod
def _typed_eq(left, right):
return (left.timestamp_micros == right.timestamp_micros
and left.value == right.value
and left.windows == right.windows
and left.pane_info == right.pane_info)

def with_value(self, new_value):
"""Creates a new WindowedValue with the same timestamps and windows as this.

Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/utils/windowed_value_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Unit tests for the windowed_value."""

from __future__ import absolute_import

import copy
import pickle
import unittest
Expand Down
1 change: 1 addition & 0 deletions sdks/python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ modules =
apache_beam/internal
apache_beam/metrics
apache_beam/options
apache_beam/utils
commands =
python --version
pip --version
Expand Down