From a8618f11b3bbb2cadd3bf19d5bd5a2ee747189fc Mon Sep 17 00:00:00 2001 From: Matthias Feys Date: Thu, 21 Jun 2018 21:51:49 +0200 Subject: [PATCH 1/7] Futurize transforms subpackage --- sdks/python/apache_beam/pipeline_test.py | 4 +- .../python/apache_beam/transforms/__init__.py | 2 + .../apache_beam/transforms/combiners.py | 3 + .../apache_beam/transforms/combiners_test.py | 6 +- sdks/python/apache_beam/transforms/core.py | 24 +++++-- .../apache_beam/transforms/create_source.py | 13 +++- .../apache_beam/transforms/create_test.py | 12 ++-- .../apache_beam/transforms/cy_combiners.py | 7 +- .../dataflow_distribution_counter_test.py | 2 + sdks/python/apache_beam/transforms/display.py | 12 +++- .../apache_beam/transforms/display_test.py | 8 ++- .../apache_beam/transforms/ptransform.py | 6 +- .../apache_beam/transforms/ptransform_test.py | 10 ++- .../py_dataflow_distribution_counter.py | 4 ++ .../apache_beam/transforms/sideinputs.py | 2 + .../apache_beam/transforms/sideinputs_test.py | 9 ++- .../python/apache_beam/transforms/timeutil.py | 11 ++-- sdks/python/apache_beam/transforms/trigger.py | 53 ++++++++++----- .../apache_beam/transforms/trigger_test.py | 13 ++-- sdks/python/apache_beam/transforms/util.py | 10 ++- .../apache_beam/transforms/util_test.py | 4 ++ sdks/python/apache_beam/transforms/window.py | 65 ++++++++++++++++--- .../apache_beam/transforms/window_test.py | 5 +- .../transforms/write_ptransform_test.py | 2 + sdks/python/tox.ini | 1 + 25 files changed, 218 insertions(+), 70 deletions(-) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 72791fc262bc..9d4e79a9a9af 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -520,11 +520,11 @@ def test_dir(self): options = Breakfast() self.assertEquals( set(['from_dictionary', 'get_all_options', 'slices', 'style', - 'view_as', 'display_data']), + 'view_as', 'display_data', 'next']), set([attr for attr in dir(options) if not attr.startswith('_')])) self.assertEquals( set(['from_dictionary', 'get_all_options', 'style', 'view_as', - 'display_data']), + 'display_data', 'next']), set([attr for attr in dir(options.view_as(Eggs)) if not attr.startswith('_')])) diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index 3c04b370cfe6..a207009f8718 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -18,6 +18,8 @@ """PTransform and descendants.""" # pylint: disable=wildcard-import +from __future__ import absolute_import + from apache_beam.transforms import combiners from apache_beam.transforms.core import * from apache_beam.transforms.ptransform import * diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 9b0c0e81e35e..8db0fe5e14fa 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -18,9 +18,12 @@ """A library of basic combiner PTransform subclasses.""" from __future__ import absolute_import +from __future__ import division import operator import random +from builtins import object +from builtins import zip from past.builtins import long diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index f372e881024e..b0f3ebe81f3e 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -16,12 +16,16 @@ # """Unit tests for our libraries of combine PTransforms.""" +from __future__ import absolute_import +from __future__ import division import itertools import random import unittest +from builtins import range import hamcrest as hc +from future.utils import iteritems import apache_beam as beam import apache_beam.transforms.combiners as combine @@ -286,7 +290,7 @@ def match(actual): def matcher(): def match(actual): equal_to([1])([len(actual)]) - equal_to(pairs)(actual[0].iteritems()) + equal_to(pairs)(iteritems(actual[0])) return match assert_that(result, matcher()) pipeline.run() diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index bbd78342a7f1..9aa5ec07ab09 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -25,8 +25,9 @@ import random import re import types - -from six import string_types +from builtins import map +from builtins import object +from builtins import range from apache_beam import coders from apache_beam import pvalue @@ -82,6 +83,10 @@ 'Impulse', ] +try: + unicode # pylint: disable=unicode-builtin +except NameError: + unicode = str # Type variables T = typehints.TypeVariable('T') @@ -902,7 +907,8 @@ def with_outputs(self, *tags, **main_kw): """ main_tag = main_kw.pop('main', None) if main_kw: - raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys()) + raise ValueError('Unexpected keyword arguments: %s' % + list(main_kw.keys())) return _MultiParDo(self, tags, main_tag) def _pardo_fn_data(self): @@ -1712,6 +1718,10 @@ def __eq__(self, other): and self.timestamp_combiner == other.timestamp_combiner) return False + def __hash__(self): + return hash((type(self), self.windowfn, self.accumulation_mode, + self.timestamp_combiner)) + def is_default(self): return self._is_default @@ -1792,7 +1802,7 @@ def __init__(self, windowfn, **kwargs): accumulation_mode = kwargs.pop('accumulation_mode', None) timestamp_combiner = kwargs.pop('timestamp_combiner', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs.keys())) self.windowing = Windowing( windowfn, triggerfn, accumulation_mode, timestamp_combiner) super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing)) @@ -1861,7 +1871,7 @@ def __init__(self, **kwargs): super(Flatten, self).__init__() self.pipeline = kwargs.pop('pipeline', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs.keys())) def _extract_input_pvalues(self, pvalueish): try: @@ -1906,7 +1916,7 @@ def __init__(self, value): value: An object of values for the PCollection """ super(Create, self).__init__() - if isinstance(value, string_types): + if isinstance(value, (unicode, str)): raise TypeError('PTransform Create: Refusing to treat string as ' 'an iterable. (string=%r)' % value) elif isinstance(value, dict): @@ -1941,7 +1951,7 @@ def get_windowing(self, unused_inputs): @staticmethod def _create_source_from_iterable(values, coder): - return Create._create_source(map(coder.encode, values), coder) + return Create._create_source(list(map(coder.encode, values)), coder) @staticmethod def _create_source(serialized_values, coder): diff --git a/sdks/python/apache_beam/transforms/create_source.py b/sdks/python/apache_beam/transforms/create_source.py index 3d02d39463c4..aa26cebc43fd 100644 --- a/sdks/python/apache_beam/transforms/create_source.py +++ b/sdks/python/apache_beam/transforms/create_source.py @@ -15,6 +15,13 @@ # limitations under the License. # +from __future__ import absolute_import +from __future__ import division + +from builtins import map +from builtins import next +from builtins import range + from apache_beam.io import iobase from apache_beam.transforms.core import Create @@ -57,15 +64,15 @@ def split(self, desired_bundle_size, start_position=None, start_position = 0 if stop_position is None: stop_position = len(self._serialized_values) - avg_size_per_value = self._total_size / len(self._serialized_values) + avg_size_per_value = self._total_size // len(self._serialized_values) num_values_per_split = max( - int(desired_bundle_size / avg_size_per_value), 1) + int(desired_bundle_size // avg_size_per_value), 1) start = start_position while start < stop_position: end = min(start + num_values_per_split, stop_position) remaining = stop_position - end # Avoid having a too small bundle at the end. - if remaining < (num_values_per_split / 4): + if remaining < (num_values_per_split // 4): end = stop_position sub_source = Create._create_source( self._serialized_values[start:end], self._coder) diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py index b5d02acc8b11..ada36725179a 100644 --- a/sdks/python/apache_beam/transforms/create_test.py +++ b/sdks/python/apache_beam/transforms/create_test.py @@ -16,8 +16,12 @@ # """Unit tests for the Create and _CreateSource classes.""" +from __future__ import absolute_import +from __future__ import division + import logging import unittest +from builtins import range from apache_beam import Create from apache_beam.coders import FastPrimitivesCoder @@ -33,13 +37,13 @@ def setUp(self): def test_create_transform(self): with TestPipeline() as p: - assert_that(p | Create(range(10)), equal_to(range(10))) + assert_that(p | Create(list(range(10))), equal_to(list(range(10)))) def test_create_source_read(self): self.check_read([], self.coder) self.check_read([1], self.coder) # multiple values. - self.check_read(range(10), self.coder) + self.check_read(list(range(10)), self.coder) def check_read(self, values, coder): source = Create._create_source_from_iterable(values, coder) @@ -49,7 +53,7 @@ def check_read(self, values, coder): def test_create_source_read_with_initial_splits(self): self.check_read_with_initial_splits([], self.coder, num_splits=2) self.check_read_with_initial_splits([1], self.coder, num_splits=2) - values = range(8) + values = list(range(8)) # multiple values with a single split. self.check_read_with_initial_splits(values, self.coder, num_splits=1) # multiple values with a single split with a large desired bundle size @@ -70,7 +74,7 @@ def check_read_with_initial_splits(self, values, coder, num_splits): from the split sources. """ source = Create._create_source_from_iterable(values, coder) - desired_bundle_size = source._total_size / num_splits + desired_bundle_size = source._total_size // num_splits splits = source.split(desired_bundle_size) splits_info = [ (split.source, split.start_position, split.stop_position) diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py index 53a440e537e0..6805da54895f 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.py +++ b/sdks/python/apache_beam/transforms/cy_combiners.py @@ -15,12 +15,17 @@ # limitations under the License. # +# cython: language_level=3 + """A library of basic cythonized CombineFn subclasses. For internal use only; no backwards-compatibility guarantees. """ from __future__ import absolute_import +from __future__ import division + +from builtins import object from apache_beam.transforms import core @@ -162,7 +167,7 @@ def extract_output(self): self.sum %= 2**64 if self.sum >= INT64_MAX: self.sum -= 2**64 - return self.sum / self.count if self.count else _NAN + return self.sum // self.count if self.count else _NAN class CountCombineFn(AccumulatorCombineFn): diff --git a/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py index e3d3c6e5a5a6..91a888a08369 100644 --- a/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py +++ b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py @@ -14,6 +14,8 @@ otherwise, test on pure python module """ +from __future__ import absolute_import + import unittest from mock import Mock diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index 4206f2110b7d..e4d4b794b51f 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -41,13 +41,19 @@ import calendar import inspect import json +from builtins import object from datetime import datetime from datetime import timedelta -import six +from future.utils import iteritems __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] +try: + unicode # pylint: disable=unicode-builtin +except NameError: + unicode = str + class HasDisplayData(object): """ Basic mixin for elements that contain display data. @@ -141,7 +147,7 @@ def create_from_options(cls, pipeline_options): items = {k: (v if DisplayDataItem._get_value_type(v) is not None else str(v)) - for k, v in pipeline_options.display_data().items()} + for k, v in iteritems(pipeline_options.display_data())} return cls(pipeline_options._namespace(), items) @classmethod @@ -169,7 +175,7 @@ class DisplayDataItem(object): display item belongs to. """ typeDict = {str:'STRING', - six.text_type:'STRING', + unicode:'STRING', int:'INTEGER', float:'FLOAT', bool: 'BOOLEAN', diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 90bde8caa8c4..3b3ca2ea51a9 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -24,7 +24,6 @@ # pylint: disable=ungrouped-imports import hamcrest as hc -import six from hamcrest.core.base_matcher import BaseMatcher import apache_beam as beam @@ -35,6 +34,11 @@ # pylint: enable=ungrouped-imports +try: + unicode # pylint: disable=unicode-builtin +except NameError: + unicode = str + class DisplayDataItemMatcher(BaseMatcher): """ Matcher class for DisplayDataItems in unit tests. @@ -165,7 +169,7 @@ def test_create_list_display_data(self): def test_unicode_type_display_data(self): class MyDoFn(beam.DoFn): def display_data(self): - return {'unicode_string': six.text_type('my string'), + return {'unicode_string': unicode('my string'), 'unicode_literal_string': u'my literal string'} fn = MyDoFn() diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 889372f9266a..b92504a6433e 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -43,8 +43,12 @@ class and wrapper class that allows lambda functions to be used as import os import sys import threading +from builtins import hex +from builtins import object +from builtins import zip from functools import reduce +from future.utils import itervalues from google.protobuf import message from apache_beam import error @@ -622,7 +626,7 @@ def __init__(self, fn, *args, **kwargs): super(PTransformWithSideInputs, self).__init__() if (any([isinstance(v, pvalue.PCollection) for v in args]) or - any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])): + any([isinstance(v, pvalue.PCollection) for v in itervalues(kwargs)])): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 907ee04c079a..c594e6ab28b5 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -18,12 +18,16 @@ """Unit tests for the PTransform and descendants.""" from __future__ import absolute_import +from __future__ import division from __future__ import print_function import collections import operator import re import unittest +from builtins import map +from builtins import range +from builtins import zip from functools import reduce import hamcrest as hc @@ -382,7 +386,7 @@ def test_combine_with_combine_fn(self): pipeline = TestPipeline() pcoll = pipeline | 'Start' >> beam.Create(vals) result = pcoll | 'Mean' >> beam.CombineGlobally(self._MeanCombineFn()) - assert_that(result, equal_to([sum(vals) / len(vals)])) + assert_that(result, equal_to([sum(vals) // len(vals)])) pipeline.run() def test_combine_with_callable(self): @@ -413,8 +417,8 @@ def test_combine_per_key_with_combine_fn(self): pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) result = pcoll | 'Mean' >> beam.CombinePerKey(self._MeanCombineFn()) - assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)), - ('b', sum(vals_2) / len(vals_2))])) + assert_that(result, equal_to([('a', sum(vals_1) // len(vals_1)), + ('b', sum(vals_2) // len(vals_2))])) pipeline.run() def test_combine_per_key_with_callable(self): diff --git a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py index fc9b4d22a8cd..980abab47c46 100644 --- a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py +++ b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py @@ -17,6 +17,10 @@ """For internal use only; no backwards-compatibility guarantees.""" +from __future__ import absolute_import + +from builtins import object +from builtins import range globals()['INT64_MAX'] = 2**63 - 1 globals()['INT64_MIN'] = -2**63 diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index f10cb92ed5e3..21fc919b72d1 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -26,6 +26,8 @@ from __future__ import absolute_import +from builtins import object + from apache_beam.transforms import window diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 6b93b8e9137c..1ec97c4d9c3a 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -17,9 +17,12 @@ """Unit tests for side inputs.""" +from __future__ import absolute_import + import logging import unittest +from future.utils import iteritems from nose.plugins.attrib import attr import apache_beam as beam @@ -196,7 +199,7 @@ def match(actual): [[actual_elem, actual_list, actual_dict]] = actual equal_to([expected_elem])([actual_elem]) equal_to(expected_list)(actual_list) - equal_to(expected_pairs)(actual_dict.iteritems()) + equal_to(expected_pairs)(iteritems(actual_dict)) return match assert_that(results, matcher(1, a_list, some_pairs)) @@ -286,8 +289,8 @@ def matcher(expected_elem, expected_kvs): def match(actual): [[actual_elem, actual_dict1, actual_dict2]] = actual equal_to([expected_elem])([actual_elem]) - equal_to(expected_kvs)(actual_dict1.iteritems()) - equal_to(expected_kvs)(actual_dict2.iteritems()) + equal_to(expected_kvs)(iteritems(actual_dict1)) + equal_to(expected_kvs)(iteritems(actual_dict2)) return match assert_that(results, matcher(1, some_kvs)) diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py index 8d63d49baad0..bf30a1313926 100644 --- a/sdks/python/apache_beam/transforms/timeutil.py +++ b/sdks/python/apache_beam/transforms/timeutil.py @@ -21,6 +21,9 @@ from abc import ABCMeta from abc import abstractmethod +from builtins import object + +from future.utils import with_metaclass __all__ = [ 'TimeDomain', @@ -43,11 +46,9 @@ def from_string(domain): raise ValueError('Unknown time domain: %s' % domain) -class TimestampCombinerImpl(object): +class TimestampCombinerImpl(with_metaclass(ABCMeta, object)): """Implementation of TimestampCombiner.""" - __metaclass__ = ABCMeta - @abstractmethod def assign_output_time(self, window, input_timestamp): pass @@ -72,11 +73,9 @@ def merge(self, unused_result_window, merging_timestamps): return self.combine_all(merging_timestamps) -class DependsOnlyOnWindow(TimestampCombinerImpl): +class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)): """TimestampCombinerImpl that only depends on the window.""" - __metaclass__ = ABCMeta - def combine(self, output_timestamp, other_output_timestamp): return output_timestamp diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 159b21b22257..ffa62cbb6d05 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -20,13 +20,18 @@ Triggers control when in processing time windows get emitted. """ +from __future__ import absolute_import + import collections import copy -import itertools import logging import numbers from abc import ABCMeta from abc import abstractmethod +from builtins import object + +from future.utils import iteritems +from future.utils import with_metaclass from apache_beam.coders import observable from apache_beam.portability.api import beam_runner_api_pb2 @@ -43,6 +48,10 @@ from apache_beam.utils.timestamp import TIME_GRANULARITY # AfterCount is experimental. No backwards compatibility guarantees. +try: + from itertools import izip_longest as zip_longest +except ImportError: + from itertools import zip_longest __all__ = [ 'AccumulationMode', @@ -68,14 +77,13 @@ class AccumulationMode(object): # RETRACTING = 3 -class _StateTag(object): +class _StateTag(with_metaclass(ABCMeta, object)): """An identifier used to store and retrieve typed, combinable state. The given tag must be unique for this stage. If CombineFn is None then all elements will be returned as a list, otherwise the given CombineFn will be applied (possibly incrementally and eagerly) when adding elements. """ - __metaclass__ = ABCMeta def __init__(self, tag): self.tag = tag @@ -136,12 +144,11 @@ def with_prefix(self, prefix): # pylint: disable=unused-argument # TODO(robertwb): Provisional API, Java likely to change as well. -class TriggerFn(object): +class TriggerFn(with_metaclass(ABCMeta, object)): """A TriggerFn determines when window (panes) are emitted. See https://beam.apache.org/documentation/programming-guide/#triggers """ - __metaclass__ = ABCMeta @abstractmethod def on_element(self, element, window, context): @@ -260,6 +267,9 @@ def reset(self, window, context): def __eq__(self, other): return type(self) == type(other) + def __hash__(self): + return hash(type(self)) + @staticmethod def from_runner_api(proto, context): return DefaultTrigger() @@ -446,6 +456,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.count == other.count + def __hash__(self): + return hash((type(self), self.count)) + def on_element(self, element, window, context): context.add_state(self.COUNT_TAG, 1) @@ -484,6 +497,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.underlying == other.underlying + def __hash__(self): + return hash((type(self), self.underlying)) + def on_element(self, element, window, context): self.underlying.on_element(element, window, context) @@ -512,9 +528,7 @@ def to_runner_api(self, context): subtrigger=self.underlying.to_runner_api(context))) -class _ParallelTriggerFn(TriggerFn): - - __metaclass__ = ABCMeta +class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)): def __init__(self, *triggers): self.triggers = triggers @@ -526,6 +540,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers + def __hash__(self): + return hash((type(self), self.triggers)) + @abstractmethod def combine_op(self, trigger_results): pass @@ -620,6 +637,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers + def __hash__(self): + return hash((type(self), self.triggers)) + def on_element(self, element, window, context): ix = context.get_state(self.INDEX_TAG) if ix < len(self.triggers): @@ -744,14 +764,12 @@ def clear_state(self, tag): # pylint: disable=unused-argument -class SimpleState(object): +class SimpleState(with_metaclass(ABCMeta, object)): """Basic state storage interface used for triggering. Only timers must hold the watermark (by their timestamp). """ - __metaclass__ = ABCMeta - @abstractmethod def set_timer(self, window, name, time_domain, timestamp): pass @@ -863,7 +881,7 @@ def merge(self, to_be_merged, merge_result): self._persist_window_ids() def known_windows(self): - return self.window_ids.keys() + return list(self.window_ids.keys()) def get_window(self, window_id): for window, ids in self.window_ids.items(): @@ -922,11 +940,9 @@ def create_trigger_driver(windowing, return driver -class TriggerDriver(object): +class TriggerDriver(with_metaclass(ABCMeta, object)): """Breaks a series of bundle and timer firings into window (pane)s.""" - __metaclass__ = ABCMeta - @abstractmethod def process_elements(self, state, windowed_values, output_watermark): pass @@ -972,10 +988,13 @@ def __eq__(self, other): if isinstance(other, collections.Iterable): return all( a == b - for a, b in itertools.izip_longest(self, other, fillvalue=object())) + for a, b in zip_longest(self, other, fillvalue=object())) else: return NotImplemented + def __hash__(self): + return hash(self) + def __ne__(self, other): return not self == other @@ -1250,7 +1269,7 @@ def get_and_clear_timers(self, watermark=MAX_TIMESTAMP): def get_earliest_hold(self): earliest_hold = MAX_TIMESTAMP - for unused_window, tagged_states in self.state.iteritems(): + for unused_window, tagged_states in iteritems(self.state): # TODO(BEAM-2519): currently, this assumes that the watermark hold tag is # named "watermark". This is currently only true because the only place # watermark holds are set is in the GeneralTriggerDriver, where we use diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 2e672bb0cf1b..9d07e83d64a7 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -17,12 +17,17 @@ """Unit tests for the triggering classes.""" +from __future__ import absolute_import + import collections import os.path import pickle import unittest +from builtins import range +from builtins import zip import yaml +from future.utils import iteritems import apache_beam as beam from apache_beam.runners import pipeline_context @@ -382,7 +387,7 @@ def test_picklable_output(self): pickle.dumps(unpicklable) for unwindowed in driver.process_elements(None, unpicklable, None): self.assertEqual(pickle.loads(pickle.dumps(unwindowed)).value, - range(10)) + list(range(10))) class RunnerApiTest(unittest.TestCase): @@ -421,12 +426,12 @@ def format_result(k_v): | beam.GroupByKey() | beam.Map(format_result)) assert_that(result, equal_to( - { + iteritems({ 'A-5': {1, 2, 3, 4, 5}, # A-10, A-11 never emitted due to AfterCount(3) never firing. 'B-4': {6, 7, 8, 9}, 'B-3': {10, 15, 16}, - }.iteritems())) + }))) class TranscriptTest(unittest.TestCase): @@ -556,7 +561,7 @@ def fire_timers(): for line in spec['transcript']: - action, params = line.items()[0] + action, params = list(line.items())[0] if action != 'expect': # Fail if we have output that was not expected in the transcript. diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 07cab5453510..e84b56b2cd4a 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -19,11 +19,17 @@ """ from __future__ import absolute_import +from __future__ import division import collections import contextlib import random import time +from builtins import object +from builtins import range +from builtins import zip + +from future.utils import itervalues from apache_beam import typehints from apache_beam.metrics import Metrics @@ -114,12 +120,12 @@ def __init__(self, **kwargs): super(CoGroupByKey, self).__init__() self.pipeline = kwargs.pop('pipeline', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs.keys())) def _extract_input_pvalues(self, pvalueish): try: # If this works, it's a dict. - return pvalueish, tuple(pvalueish.viewvalues()) + return pvalueish, tuple(itervalues(pvalueish)) except AttributeError: pcolls = tuple(pvalueish) return pcolls, pcolls diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index d834a1c5efea..6cec4a5bf361 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -17,9 +17,13 @@ """Unit tests for the transform.util classes.""" +from __future__ import absolute_import + import logging import time import unittest +from builtins import object +from builtins import range import apache_beam as beam from apache_beam.coders import coders diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 5bc047b48c7a..60c117e9dcd1 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -50,7 +50,11 @@ from __future__ import absolute_import import abc +from builtins import object +from builtins import range +from functools import total_ordering +from future.utils import with_metaclass from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 from past.builtins import cmp @@ -109,11 +113,9 @@ def get_impl(timestamp_combiner, window_fn): raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner) -class WindowFn(urns.RunnerApiFn): +class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)): """An abstract windowing function defining a basic assign and merge.""" - __metaclass__ = abc.ABCMeta - class AssignContext(object): """Context passed to WindowFn.assign().""" @@ -191,15 +193,41 @@ def __init__(self, end): def max_timestamp(self): return self.end.predecessor() - def __cmp__(self, other): + def cmp(self, other): # Order first by endpoint, then arbitrarily. - return cmp(self.end, other.end) or cmp(hash(self), hash(other)) + end_cmp = (self.end > other.end) - (self.end < other.end) + hash_cmp = (hash(self) > hash(other)) - (hash(self) < hash(other)) + return end_cmp or hash_cmp def __eq__(self, other): - raise NotImplementedError + return self.cmp(other) == 0 + + def __ne__(self, other): + return self.cmp(other) != 0 + + def __lt__(self, other): + return self.cmp(other) < 0 + + def __le__(self, other): + return self.cmp(other) <= 0 + + def __gt__(self, other): + return self.cmp(other) > 0 + + def __ge__(self, other): + return self.cmp(other) >= 0 + + def __hash__(self): + return hash(self) + + # def __lt__(self, other): + # if self.end == other.end: + # return hash(self) < hash(other) + # else: + # return self.end < other.end def __hash__(self): - return hash(self.end) + return hash(self) def __repr__(self): return '[?, %s)' % float(self.end) @@ -234,6 +262,7 @@ def union(self, other): min(self.start, other.start), max(self.end, other.end)) +@total_ordering class TimestampedValue(object): """A timestamped value having a value and a timestamp. @@ -246,10 +275,17 @@ def __init__(self, value, timestamp): self.value = value self.timestamp = Timestamp.of(timestamp) - def __cmp__(self, other): + def __eq__(self, other): + return (type(self) == type(other)) and (self.value == other.value) and \ + (self.timestamp == other.timestamp) + + def __hash__(self): + return hash((type(self), self.value, self.timestamp)) + + def __lt__(self, other): if type(self) is not type(other): - return cmp(type(self), type(other)) - return cmp((self.value, self.timestamp), (other.value, other.timestamp)) + return type(self) < type(other) + return (self.value, self.timestamp) < (other.value, other.timestamp) class GlobalWindow(BoundedWindow): @@ -348,6 +384,9 @@ def __eq__(self, other): if type(self) == type(other) == FixedWindows: return self.size == other.size and self.offset == other.offset + def __hash__(self): + return hash((type(self), self.size, self.offset)) + def __ne__(self, other): return not self == other @@ -407,6 +446,9 @@ def __eq__(self, other): and self.offset == other.offset and self.period == other.period) + def __hash__(self): + return hash((type(self), self.offset, self.period)) + def to_runner_api_parameter(self, context): return (common_urns.sliding_windows.urn, standard_window_fns_pb2.SlidingWindowsPayload( @@ -474,6 +516,9 @@ def __eq__(self, other): if type(self) == type(other) == Sessions: return self.gap_size == other.gap_size + def __hash__(self): + return hash((type(self), self.gap_size)) + def to_runner_api_parameter(self, context): return (common_urns.session_windows.urn, standard_window_fns_pb2.SessionsPayload( diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 7c1d4e99f5e3..77ab47e3dd8b 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -16,8 +16,11 @@ # """Unit tests for the windowing classes.""" +from __future__ import absolute_import +from __future__ import division import unittest +from builtins import range from apache_beam.runners import pipeline_context from apache_beam.testing.test_pipeline import TestPipeline @@ -236,7 +239,7 @@ def test_timestamped_with_combiners(self): # We add a 'key' to each value representing the index of the # window. This is important since there is no guarantee of # order for the elements of a PCollection. - | Map(lambda v: (v / 5, v))) + | Map(lambda v: (v // 5, v))) # Sum all elements associated with a key and window. Although it # is called CombinePerKey it is really CombinePerKeyAndWindow the # same way GroupByKey is really GroupByKeyAndWindow. diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index bf4941a5d5de..a8f56fd103b4 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -16,6 +16,8 @@ # """Unit tests for the write transform.""" +from __future__ import absolute_import + import logging import unittest diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index de2352aafe99..da4b00cafc17 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -116,6 +116,7 @@ modules = apache_beam/testing apache_beam/tools apache_beam/typehints + apache_beam/transforms commands = python --version pip --version From 924c44771ecae44b43f26c7c0d65e039c64344da Mon Sep 17 00:00:00 2001 From: Matthias Feys Date: Sat, 7 Jul 2018 13:58:00 +0200 Subject: [PATCH 2/7] incorporated all feedback for futurize transforms subpackage --- .../apache_beam/transforms/combiners_test.py | 5 ++--- sdks/python/apache_beam/transforms/core.py | 16 +++++++-------- sdks/python/apache_beam/transforms/display.py | 9 ++------- .../apache_beam/transforms/display_test.py | 6 +----- .../apache_beam/transforms/ptransform.py | 3 +-- .../apache_beam/transforms/sideinputs_test.py | 7 +++---- sdks/python/apache_beam/transforms/trigger.py | 4 ++-- .../apache_beam/transforms/trigger_test.py | 5 ++--- .../apache_beam/transforms/userstate.py | 1 + .../apache_beam/transforms/userstate_test.py | 1 + sdks/python/apache_beam/transforms/util.py | 2 +- sdks/python/apache_beam/transforms/window.py | 20 ++++++------------- 12 files changed, 30 insertions(+), 49 deletions(-) diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index b0f3ebe81f3e..a768231ec6e1 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -22,10 +22,9 @@ import itertools import random import unittest -from builtins import range import hamcrest as hc -from future.utils import iteritems +from future.builtins import range import apache_beam as beam import apache_beam.transforms.combiners as combine @@ -290,7 +289,7 @@ def match(actual): def matcher(): def match(actual): equal_to([1])([len(actual)]) - equal_to(pairs)(iteritems(actual[0])) + equal_to(pairs)(actual[0].items()) return match assert_that(result, matcher()) pipeline.run() diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 9aa5ec07ab09..929b1481e97a 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -21,14 +21,16 @@ import copy import inspect -import itertools import random import re import types +from builtins import filter from builtins import map from builtins import object from builtins import range +from past.builtins import unicode + from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints @@ -83,11 +85,6 @@ 'Impulse', ] -try: - unicode # pylint: disable=unicode-builtin -except NameError: - unicode = str - # Type variables T = typehints.TypeVariable('T') K = typehints.TypeVariable('K') @@ -296,6 +293,9 @@ def __eq__(self, other): return self.param_id == other.param_id return False + def __hash__(self): + return hash((type(self), self.param_id)) + def __repr__(self): return self.param_id @@ -703,7 +703,7 @@ def merge_accumulators(self, accumulators, *args, **kwargs): class ReiterableNonEmptyAccumulators(object): def __iter__(self): - return itertools.ifilter(filter_fn, accumulators) + return filter(filter_fn, accumulators) # It's (weakly) assumed that self._fn is associative. return self._fn(ReiterableNonEmptyAccumulators(), *args, **kwargs) @@ -1916,7 +1916,7 @@ def __init__(self, value): value: An object of values for the PCollection """ super(Create, self).__init__() - if isinstance(value, (unicode, str)): + if isinstance(value, (unicode, str, bytes)): raise TypeError('PTransform Create: Refusing to treat string as ' 'an iterable. (string=%r)' % value) elif isinstance(value, dict): diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index e4d4b794b51f..ce10174e00da 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -45,15 +45,10 @@ from datetime import datetime from datetime import timedelta -from future.utils import iteritems +from past.builtins import unicode __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] -try: - unicode # pylint: disable=unicode-builtin -except NameError: - unicode = str - class HasDisplayData(object): """ Basic mixin for elements that contain display data. @@ -147,7 +142,7 @@ def create_from_options(cls, pipeline_options): items = {k: (v if DisplayDataItem._get_value_type(v) is not None else str(v)) - for k, v in iteritems(pipeline_options.display_data())} + for k, v in pipeline_options.display_data().items()} return cls(pipeline_options._namespace(), items) @classmethod diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 3b3ca2ea51a9..bdaade68fa0f 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -25,6 +25,7 @@ # pylint: disable=ungrouped-imports import hamcrest as hc from hamcrest.core.base_matcher import BaseMatcher +from past.builtins import unicode import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions @@ -34,11 +35,6 @@ # pylint: enable=ungrouped-imports -try: - unicode # pylint: disable=unicode-builtin -except NameError: - unicode = str - class DisplayDataItemMatcher(BaseMatcher): """ Matcher class for DisplayDataItems in unit tests. diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index b92504a6433e..7a53fbe25b0f 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -48,7 +48,6 @@ class and wrapper class that allows lambda functions to be used as from builtins import zip from functools import reduce -from future.utils import itervalues from google.protobuf import message from apache_beam import error @@ -626,7 +625,7 @@ def __init__(self, fn, *args, **kwargs): super(PTransformWithSideInputs, self).__init__() if (any([isinstance(v, pvalue.PCollection) for v in args]) or - any([isinstance(v, pvalue.PCollection) for v in itervalues(kwargs)])): + any([isinstance(v, pvalue.PCollection) for v in kwargs.values()])): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 1ec97c4d9c3a..f9c9ae93d625 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -22,7 +22,6 @@ import logging import unittest -from future.utils import iteritems from nose.plugins.attrib import attr import apache_beam as beam @@ -199,7 +198,7 @@ def match(actual): [[actual_elem, actual_list, actual_dict]] = actual equal_to([expected_elem])([actual_elem]) equal_to(expected_list)(actual_list) - equal_to(expected_pairs)(iteritems(actual_dict)) + equal_to(expected_pairs)(actual_dict.items()) return match assert_that(results, matcher(1, a_list, some_pairs)) @@ -289,8 +288,8 @@ def matcher(expected_elem, expected_kvs): def match(actual): [[actual_elem, actual_dict1, actual_dict2]] = actual equal_to([expected_elem])([actual_elem]) - equal_to(expected_kvs)(iteritems(actual_dict1)) - equal_to(expected_kvs)(iteritems(actual_dict2)) + equal_to(expected_kvs)(actual_dict1.items()) + equal_to(expected_kvs)(actual_dict2.items()) return match assert_that(results, matcher(1, some_kvs)) diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index ffa62cbb6d05..36062bf1531a 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -881,7 +881,7 @@ def merge(self, to_be_merged, merge_result): self._persist_window_ids() def known_windows(self): - return list(self.window_ids.keys()) + return list(self.window_ids) def get_window(self, window_id): for window, ids in self.window_ids.items(): @@ -993,7 +993,7 @@ def __eq__(self, other): return NotImplemented def __hash__(self): - return hash(self) + return hash(tuple(self)) def __ne__(self, other): return not self == other diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 9d07e83d64a7..034abae65c8a 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -27,7 +27,6 @@ from builtins import zip import yaml -from future.utils import iteritems import apache_beam as beam from apache_beam.runners import pipeline_context @@ -426,12 +425,12 @@ def format_result(k_v): | beam.GroupByKey() | beam.Map(format_result)) assert_that(result, equal_to( - iteritems({ + { 'A-5': {1, 2, 3, 4, 5}, # A-10, A-11 never emitted due to AfterCount(3) never firing. 'B-4': {6, 7, 8, 9}, 'B-3': {10, 15, 16}, - }))) + }.items())) class TranscriptTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py index 6a5fd581bb70..0f99da246a5c 100644 --- a/sdks/python/apache_beam/transforms/userstate.py +++ b/sdks/python/apache_beam/transforms/userstate.py @@ -23,6 +23,7 @@ from __future__ import absolute_import import types +from builtins import object from apache_beam.coders import Coder from apache_beam.transforms.timeutil import TimeDomain diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py index 8dbc9ce5e77a..b891e6281782 100644 --- a/sdks/python/apache_beam/transforms/userstate_test.py +++ b/sdks/python/apache_beam/transforms/userstate_test.py @@ -16,6 +16,7 @@ # """Unit tests for the Beam State and Timer API interfaces.""" +from __future__ import absolute_import import unittest diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index e84b56b2cd4a..9bac2d6fed23 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -279,7 +279,7 @@ def div_keys(kv1_kv2): pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]), key=div_keys) # Keep the top 1/3 most different pairs, average the top 2/3 most similar. - threshold = 2 * len(pairs) / 3 + threshold = 2 * len(pairs) // 3 self._data = ( list(sum(pairs[threshold:], ())) + [((x1 + x2) / 2.0, (t1 + t2) / 2.0) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 60c117e9dcd1..3e88acfcf350 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -200,7 +200,7 @@ def cmp(self, other): return end_cmp or hash_cmp def __eq__(self, other): - return self.cmp(other) == 0 + raise NotImplementedError def __ne__(self, other): return self.cmp(other) != 0 @@ -218,16 +218,7 @@ def __ge__(self, other): return self.cmp(other) >= 0 def __hash__(self): - return hash(self) - - # def __lt__(self, other): - # if self.end == other.end: - # return hash(self) < hash(other) - # else: - # return self.end < other.end - - def __hash__(self): - return hash(self) + return hash(self.end) def __repr__(self): return '[?, %s)' % float(self.end) @@ -276,15 +267,16 @@ def __init__(self, value, timestamp): self.timestamp = Timestamp.of(timestamp) def __eq__(self, other): - return (type(self) == type(other)) and (self.value == other.value) and \ - (self.timestamp == other.timestamp) + return (type(self) == type(other) + and self.value == other.value + and self.timestamp == other.timestamp) def __hash__(self): return hash((type(self), self.value, self.timestamp)) def __lt__(self, other): if type(self) is not type(other): - return type(self) < type(other) + return hash(type(self)) < hash(type(other)) return (self.value, self.timestamp) < (other.value, other.timestamp) From 6b80c40abe176f32701258b89eab144214bc092a Mon Sep 17 00:00:00 2001 From: Matthias Feys Date: Thu, 12 Jul 2018 12:19:12 +0200 Subject: [PATCH 3/7] added ifilter for PY2 --- sdks/python/apache_beam/transforms/core.py | 6 +++++- sdks/python/apache_beam/transforms/window.py | 1 - 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 929b1481e97a..5e9f6b87dbc7 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -24,7 +24,6 @@ import random import re import types -from builtins import filter from builtins import map from builtins import object from builtins import range @@ -65,6 +64,11 @@ from apache_beam.typehints.typehints import is_consistent_with from apache_beam.utils import urns +try: + from itertools import ifilter as filter +except ImportError: + pass + __all__ = [ 'DoFn', 'CombineFn', diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 3e88acfcf350..14d169ff380f 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -57,7 +57,6 @@ from future.utils import with_metaclass from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 -from past.builtins import cmp from apache_beam.coders import coders from apache_beam.portability import common_urns From 0092d4a72307e5f45577c3b73690fac4e15c1016 Mon Sep 17 00:00:00 2001 From: Matthias Feys Date: Fri, 20 Jul 2018 18:12:23 +0200 Subject: [PATCH 4/7] remove total ordering and cmp method --- sdks/python/apache_beam/transforms/core.py | 12 ++--- sdks/python/apache_beam/transforms/trigger.py | 5 +- sdks/python/apache_beam/transforms/window.py | 47 ++++++++++++------- 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 5e9f6b87dbc7..71918347e6d4 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -28,6 +28,7 @@ from builtins import object from builtins import range +from future.builtins import filter from past.builtins import unicode from apache_beam import coders @@ -64,11 +65,6 @@ from apache_beam.typehints.typehints import is_consistent_with from apache_beam.utils import urns -try: - from itertools import ifilter as filter -except ImportError: - pass - __all__ = [ 'DoFn', 'CombineFn', @@ -912,7 +908,7 @@ def with_outputs(self, *tags, **main_kw): main_tag = main_kw.pop('main', None) if main_kw: raise ValueError('Unexpected keyword arguments: %s' % - list(main_kw.keys())) + list(main_kw)) return _MultiParDo(self, tags, main_tag) def _pardo_fn_data(self): @@ -1806,7 +1802,7 @@ def __init__(self, windowfn, **kwargs): accumulation_mode = kwargs.pop('accumulation_mode', None) timestamp_combiner = kwargs.pop('timestamp_combiner', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % list(kwargs.keys())) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs)) self.windowing = Windowing( windowfn, triggerfn, accumulation_mode, timestamp_combiner) super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing)) @@ -1875,7 +1871,7 @@ def __init__(self, **kwargs): super(Flatten, self).__init__() self.pipeline = kwargs.pop('pipeline', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % list(kwargs.keys())) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs)) def _extract_input_pvalues(self, pvalueish): try: diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 36062bf1531a..1f1ca4aa20f5 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -30,6 +30,7 @@ from abc import abstractmethod from builtins import object +from future.moves.itertools import zip_longest from future.utils import iteritems from future.utils import with_metaclass @@ -48,10 +49,6 @@ from apache_beam.utils.timestamp import TIME_GRANULARITY # AfterCount is experimental. No backwards compatibility guarantees. -try: - from itertools import izip_longest as zip_longest -except ImportError: - from itertools import zip_longest __all__ = [ 'AccumulationMode', diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 14d169ff380f..1963f11296e2 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -52,11 +52,11 @@ import abc from builtins import object from builtins import range -from functools import total_ordering from future.utils import with_metaclass from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 +from past.builtins import cmp from apache_beam.coders import coders from apache_beam.portability import common_urns @@ -192,32 +192,31 @@ def __init__(self, end): def max_timestamp(self): return self.end.predecessor() - def cmp(self, other): - # Order first by endpoint, then arbitrarily. - end_cmp = (self.end > other.end) - (self.end < other.end) - hash_cmp = (hash(self) > hash(other)) - (hash(self) < hash(other)) - return end_cmp or hash_cmp - def __eq__(self, other): raise NotImplementedError def __ne__(self, other): - return self.cmp(other) != 0 + return (cmp(self.end, other.end) + or cmp(hash(self), hash(other))) != 0 def __lt__(self, other): - return self.cmp(other) < 0 + return (cmp(self.end, other.end) + or cmp(hash(self), hash(other))) < 0 def __le__(self, other): - return self.cmp(other) <= 0 + return (cmp(self.end, other.end) + or cmp(hash(self), hash(other))) <= 0 def __gt__(self, other): - return self.cmp(other) > 0 + return (cmp(self.end, other.end) + or cmp(hash(self), hash(other))) > 0 def __ge__(self, other): - return self.cmp(other) >= 0 + return (cmp(self.end, other.end) + or cmp(hash(self), hash(other))) >= 0 def __hash__(self): - return hash(self.end) + raise NotImplementedError def __repr__(self): return '[?, %s)' % float(self.end) @@ -252,7 +251,6 @@ def union(self, other): min(self.start, other.start), max(self.end, other.end)) -@total_ordering class TimestampedValue(object): """A timestamped value having a value and a timestamp. @@ -273,10 +271,25 @@ def __eq__(self, other): def __hash__(self): return hash((type(self), self.value, self.timestamp)) + def __ne__(self, other): + return (cmp(type(self), type(other)) or cmp( + (self.value, self.timestamp), (other.value, other.timestamp))) != 0 + def __lt__(self, other): - if type(self) is not type(other): - return hash(type(self)) < hash(type(other)) - return (self.value, self.timestamp) < (other.value, other.timestamp) + return (cmp(type(self), type(other)) or cmp( + (self.value, self.timestamp), (other.value, other.timestamp))) < 0 + + def __le__(self, other): + return (cmp(type(self), type(other)) or cmp( + (self.value, self.timestamp), (other.value, other.timestamp))) <= 0 + + def __gt__(self, other): + return (cmp(type(self), type(other)) or cmp( + (self.value, self.timestamp), (other.value, other.timestamp))) > 0 + + def __ge__(self, other): + return (cmp(type(self), type(other)) or cmp( + (self.value, self.timestamp), (other.value, other.timestamp))) >= 0 class GlobalWindow(BoundedWindow): From 94cfda032cf82eabe7513397794cd2ce456bcc54 Mon Sep 17 00:00:00 2001 From: Matthias Feys Date: Wed, 25 Jul 2018 01:46:54 +0200 Subject: [PATCH 5/7] added old_div to fix performance regression in PY2 --- sdks/python/apache_beam/transforms/util.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 9bac2d6fed23..dbd0f709d6cb 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -30,6 +30,7 @@ from builtins import zip from future.utils import itervalues +from past.utils import old_div from apache_beam import typehints from apache_beam.metrics import Metrics @@ -274,7 +275,7 @@ def _thin_data(self): def div_keys(kv1_kv2): (x1, _), (x2, _) = kv1_kv2 - return x2 / x1 + return old_div(x2, x1) # TODO(BEAM-4858) pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]), key=div_keys) From d8480910fd28d75f63fbdbd96cbb35baea80ed57 Mon Sep 17 00:00:00 2001 From: Matthias Feys Date: Fri, 27 Jul 2018 21:25:34 +0200 Subject: [PATCH 6/7] added __ne__ and total_ordering --- sdks/python/apache_beam/transforms/window.py | 67 +++++++++++--------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 1963f11296e2..9ddec640bbc4 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -52,11 +52,11 @@ import abc from builtins import object from builtins import range +from functools import total_ordering from future.utils import with_metaclass from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 -from past.builtins import cmp from apache_beam.coders import coders from apache_beam.portability import common_urns @@ -196,24 +196,28 @@ def __eq__(self, other): raise NotImplementedError def __ne__(self, other): - return (cmp(self.end, other.end) - or cmp(hash(self), hash(other))) != 0 + # Order first by endpoint, then arbitrarily + return self.end != other.end or hash(self) != hash(other) def __lt__(self, other): - return (cmp(self.end, other.end) - or cmp(hash(self), hash(other))) < 0 + if self.end != other.end: + return self.end < other.end + return hash(self) < hash(other) def __le__(self, other): - return (cmp(self.end, other.end) - or cmp(hash(self), hash(other))) <= 0 + if self.end != other.end: + return self.end <= other.end + return hash(self) <= hash(other) def __gt__(self, other): - return (cmp(self.end, other.end) - or cmp(hash(self), hash(other))) > 0 + if self.end != other.end: + return self.end > other.end + return hash(self) > hash(other) def __ge__(self, other): - return (cmp(self.end, other.end) - or cmp(hash(self), hash(other))) >= 0 + if self.end != other.end: + return self.end >= other.end + return hash(self) >= hash(other) def __hash__(self): raise NotImplementedError @@ -235,10 +239,15 @@ def __init__(self, start, end): self.start = Timestamp.of(start) def __hash__(self): - return hash((self.start, self.end)) + return hash((self.start, self.end, type(self))) def __eq__(self, other): - return self.start == other.start and self.end == other.end + return (self.start == other.start + and self.end == other.end + and type(self) == type(other)) + + def __ne__(self, other): + return not self == other def __repr__(self): return '[%s, %s)' % (float(self.start), float(self.end)) @@ -251,6 +260,7 @@ def union(self, other): min(self.start, other.start), max(self.end, other.end)) +@total_ordering class TimestampedValue(object): """A timestamped value having a value and a timestamp. @@ -272,24 +282,14 @@ def __hash__(self): return hash((type(self), self.value, self.timestamp)) def __ne__(self, other): - return (cmp(type(self), type(other)) or cmp( - (self.value, self.timestamp), (other.value, other.timestamp))) != 0 + return not self == other def __lt__(self, other): - return (cmp(type(self), type(other)) or cmp( - (self.value, self.timestamp), (other.value, other.timestamp))) < 0 - - def __le__(self, other): - return (cmp(type(self), type(other)) or cmp( - (self.value, self.timestamp), (other.value, other.timestamp))) <= 0 - - def __gt__(self, other): - return (cmp(type(self), type(other)) or cmp( - (self.value, self.timestamp), (other.value, other.timestamp))) > 0 - - def __ge__(self, other): - return (cmp(type(self), type(other)) or cmp( - (self.value, self.timestamp), (other.value, other.timestamp))) >= 0 + if type(self) != type(other): + return type(self) < type(other) + if self.value != other.value: + return self.value < other.value + return self.timestamp < other.timestamp class GlobalWindow(BoundedWindow): @@ -315,6 +315,9 @@ def __eq__(self, other): # Global windows are always and only equal to each other. return self is other or type(self) is type(other) + def __ne__(self, other): + return not self == other + class NonMergingWindowFn(WindowFn): @@ -450,6 +453,9 @@ def __eq__(self, other): and self.offset == other.offset and self.period == other.period) + def __ne__(self, other): + return not self == other + def __hash__(self): return hash((type(self), self.offset, self.period)) @@ -520,6 +526,9 @@ def __eq__(self, other): if type(self) == type(other) == Sessions: return self.gap_size == other.gap_size + def __ne__(self, other): + return not self == other + def __hash__(self): return hash((type(self), self.gap_size)) From a224c58e8499b7367385ffd2b2a83d63b0bbb044 Mon Sep 17 00:00:00 2001 From: Matthias Feys Date: Fri, 3 Aug 2018 09:43:19 +0200 Subject: [PATCH 7/7] remove type from __hash__ --- sdks/python/apache_beam/transforms/core.py | 5 ++--- sdks/python/apache_beam/transforms/cy_combiners.py | 2 +- sdks/python/apache_beam/transforms/trigger.py | 8 ++++---- sdks/python/apache_beam/transforms/window.py | 12 ++++++------ 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 71918347e6d4..fa867e5231d1 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -294,7 +294,7 @@ def __eq__(self, other): return False def __hash__(self): - return hash((type(self), self.param_id)) + return hash(self.param_id) def __repr__(self): return self.param_id @@ -1672,7 +1672,6 @@ def expand(self, pcoll): class Windowing(object): - def __init__(self, windowfn, triggerfn=None, accumulation_mode=None, timestamp_combiner=None): global AccumulationMode, DefaultTrigger # pylint: disable=global-variable-not-assigned @@ -1719,7 +1718,7 @@ def __eq__(self, other): return False def __hash__(self): - return hash((type(self), self.windowfn, self.accumulation_mode, + return hash((self.windowfn, self.accumulation_mode, self.timestamp_combiner)) def is_default(self): diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py index 6805da54895f..2234ef98d871 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.py +++ b/sdks/python/apache_beam/transforms/cy_combiners.py @@ -263,7 +263,7 @@ def merge(self, accumulators): self.count += accumulator.count def extract_output(self): - return self.sum / self.count if self.count else _NAN + return self.sum // self.count if self.count else _NAN class SumFloatFn(AccumulatorCombineFn): diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 1f1ca4aa20f5..c185a5222184 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -454,7 +454,7 @@ def __eq__(self, other): return type(self) == type(other) and self.count == other.count def __hash__(self): - return hash((type(self), self.count)) + return hash(self.count) def on_element(self, element, window, context): context.add_state(self.COUNT_TAG, 1) @@ -495,7 +495,7 @@ def __eq__(self, other): return type(self) == type(other) and self.underlying == other.underlying def __hash__(self): - return hash((type(self), self.underlying)) + return hash(self.underlying) def on_element(self, element, window, context): self.underlying.on_element(element, window, context) @@ -538,7 +538,7 @@ def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers def __hash__(self): - return hash((type(self), self.triggers)) + return hash(self.triggers) @abstractmethod def combine_op(self, trigger_results): @@ -635,7 +635,7 @@ def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers def __hash__(self): - return hash((type(self), self.triggers)) + return hash(self.triggers) def on_element(self, element, window, context): ix = context.get_state(self.INDEX_TAG) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 9ddec640bbc4..067227bb3f87 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -239,7 +239,7 @@ def __init__(self, start, end): self.start = Timestamp.of(start) def __hash__(self): - return hash((self.start, self.end, type(self))) + return hash((self.start, self.end)) def __eq__(self, other): return (self.start == other.start @@ -279,14 +279,14 @@ def __eq__(self, other): and self.timestamp == other.timestamp) def __hash__(self): - return hash((type(self), self.value, self.timestamp)) + return hash((self.value, self.timestamp)) def __ne__(self, other): return not self == other def __lt__(self, other): if type(self) != type(other): - return type(self) < type(other) + return type(self).__name__ < type(other).__name__ if self.value != other.value: return self.value < other.value return self.timestamp < other.timestamp @@ -392,7 +392,7 @@ def __eq__(self, other): return self.size == other.size and self.offset == other.offset def __hash__(self): - return hash((type(self), self.size, self.offset)) + return hash((self.size, self.offset)) def __ne__(self, other): return not self == other @@ -457,7 +457,7 @@ def __ne__(self, other): return not self == other def __hash__(self): - return hash((type(self), self.offset, self.period)) + return hash((self.offset, self.period)) def to_runner_api_parameter(self, context): return (common_urns.sliding_windows.urn, @@ -530,7 +530,7 @@ def __ne__(self, other): return not self == other def __hash__(self): - return hash((type(self), self.gap_size)) + return hash(self.gap_size) def to_runner_api_parameter(self, context): return (common_urns.session_windows.urn,