From a9ce8d645c75aaceb8879ff5ea867b345e6baeaa Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 11:18:26 -0700 Subject: [PATCH 01/27] Fix unicode imports and NoneType --- sdks/python/apache_beam/coders/coder_impl.py | 6 +++--- sdks/python/apache_beam/coders/coders.py | 7 ++++++- sdks/python/apache_beam/coders/typecoders.py | 2 ++ .../apache_beam/examples/cookbook/datastore_wordcount.py | 2 ++ sdks/python/apache_beam/examples/snippets/snippets.py | 2 ++ sdks/python/apache_beam/examples/streaming_wordcount.py | 1 + sdks/python/apache_beam/examples/windowed_wordcount.py | 2 ++ sdks/python/apache_beam/examples/wordcount.py | 2 ++ sdks/python/apache_beam/examples/wordcount_debugging.py | 2 ++ sdks/python/apache_beam/examples/wordcount_fnapi.py | 1 + sdks/python/apache_beam/examples/wordcount_minimal.py | 2 ++ sdks/python/apache_beam/io/gcp/datastore/v1/helper.py | 2 ++ sdks/python/apache_beam/io/gcp/pubsub.py | 3 +++ sdks/python/apache_beam/io/gcp/pubsub_test.py | 2 ++ .../apache_beam/runners/dataflow/internal/apiclient.py | 2 ++ sdks/python/apache_beam/runners/direct/direct_runner.py | 2 ++ sdks/python/apache_beam/transforms/display.py | 3 +++ sdks/python/apache_beam/transforms/display_test.py | 2 ++ sdks/python/apache_beam/typehints/opcodes.py | 2 ++ 19 files changed, 43 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 172ee74d4c83..8e0565a59756 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -28,14 +28,14 @@ """ from __future__ import absolute_import -from types import NoneType - from apache_beam.coders import observable from apache_beam.utils import windowed_value from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.utils.timestamp import Timestamp +from past.builtins import unicode + # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: from .stream import InputStream as create_InputStream @@ -264,7 +264,7 @@ def get_estimated_size_and_observables(self, value, nested=False): def encode_to_stream(self, value, stream, nested): t = type(value) - if t is NoneType: + if value is None: stream.write_byte(NONE_TYPE) elif t is int: stream.write_byte(INT_TYPE) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index f76625869879..c20eb612497b 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -22,7 +22,12 @@ from __future__ import absolute_import import base64 -import cPickle as pickle +import sys +if sys.version_info[0] == 2: + import cPickle as pickle +else: + import pickle as pickle + from past.builtins import unicode import google.protobuf from google.protobuf import wrappers_pb2 diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index dd071d7a9331..72fc64f31e24 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -67,6 +67,8 @@ def MakeXyzs(v): from apache_beam.coders import coders from apache_beam.typehints import typehints +from past.builtins import unicode + __all__ = ['registry'] diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 7204e3b2077a..3ecc93ec6b93 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -82,6 +82,8 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions +from past.builtins import unicode + class WordExtractingDoFn(beam.DoFn): """Parse each line of input text into words.""" diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 3e61a1614cea..5bc1188282cc 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -40,6 +40,8 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms.core import PTransform +from past.builtins import unicode + # Quiet some pylint warnings that happen because of the somewhat special # format for the code snippets. # pylint:disable=invalid-name diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index df8a99bcf35f..d4ec961c2e6c 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -31,6 +31,7 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions +from past.builtins import unicode def split_fn(lines): import re diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py index 4c7eee18fe31..ed7d20381454 100644 --- a/sdks/python/apache_beam/examples/windowed_wordcount.py +++ b/sdks/python/apache_beam/examples/windowed_wordcount.py @@ -29,6 +29,8 @@ import apache_beam as beam import apache_beam.transforms.window as window +from past.builtins import unicode + TABLE_SCHEMA = ('word:STRING, count:INTEGER, ' 'window_start:TIMESTAMP, window_end:TIMESTAMP') diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index b1c4a5e9c159..83cfa3a40c10 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -31,6 +31,8 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions +from past.builtins import unicode + class WordExtractingDoFn(beam.DoFn): """Parse each line of input text into words.""" diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index 6ff8f2653ffd..929b99ce707c 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -54,6 +54,8 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from past.builtins import unicode + class FilterTextFn(beam.DoFn): """A DoFn that filters for a specific key based on a regular expression.""" diff --git a/sdks/python/apache_beam/examples/wordcount_fnapi.py b/sdks/python/apache_beam/examples/wordcount_fnapi.py index 113968820d14..690ad65509be 100644 --- a/sdks/python/apache_beam/examples/wordcount_fnapi.py +++ b/sdks/python/apache_beam/examples/wordcount_fnapi.py @@ -38,6 +38,7 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions +from past.builtins import unicode class WordExtractingDoFn(beam.DoFn): """Parse each line of input text into words.""" diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index 390c8c04af88..cc4da53fa499 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -56,6 +56,8 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions +from past.builtins import unicode + def run(argv=None): """Main entry point; defines and runs the wordcount pipeline.""" diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index b86a2fa01455..2399cec48c90 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -47,6 +47,8 @@ # pylint: enable=ungrouped-imports +from past.builtins import unicode + def key_comparator(k1, k2): """A comparator for Datastore keys. diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 98aa884c71dc..f909386d0067 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -36,6 +36,9 @@ from apache_beam.transforms import window from apache_beam.transforms.display import DisplayDataItem + +from past.builtins import unicode + __all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub'] diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 8bd9fa4f41aa..05a98f87ba91 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -41,6 +41,8 @@ pubsub = None # pylint: enable=wrong-import-order, wrong-import-position +from past.builtins import unicode + @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') class TestReadStringsFromPubSubOverride(unittest.TestCase): diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 1cf80b799021..c0863f701276 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -49,6 +49,8 @@ from apache_beam.transforms.display import DisplayData from apache_beam.utils import retry +from past.builtins import unicode + # Environment version information. It is passed to the service during a # a job submission and is used by the service to establish what features # are expected by the workers. diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index cb4eb43d7320..fb05776cd349 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -51,6 +51,8 @@ from apache_beam.transforms.core import _GroupByKeyOnly from apache_beam.transforms.ptransform import PTransform +from past.builtins import unicode + # Note that the BundleBasedDirectRunner and SwitchingDirectRunner names are # experimental and have no backwards compatibility guarantees. __all__ = ['BundleBasedDirectRunner', diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index cb7b53eb29aa..a93bda51de05 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -44,6 +44,9 @@ from datetime import datetime from datetime import timedelta + +from past.builtins import unicode + __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 5c73cf39a92f..5676cf8bbe8b 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -31,6 +31,8 @@ from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.display import HasDisplayData +from past.builtins import unicode + class DisplayDataItemMatcher(BaseMatcher): """ Matcher class for DisplayDataItems in unit tests. diff --git a/sdks/python/apache_beam/typehints/opcodes.py b/sdks/python/apache_beam/typehints/opcodes.py index 7fae11b63eab..4ea9f3177d04 100644 --- a/sdks/python/apache_beam/typehints/opcodes.py +++ b/sdks/python/apache_beam/typehints/opcodes.py @@ -47,6 +47,8 @@ from .typehints import Tuple from .typehints import Union +from past.builtins import unicode + def pop_one(state, unused_arg): del state.stack[-1:] From 72106c7e765ecf10c7fcf234b43422ad84944050 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 11:31:06 -0700 Subject: [PATCH 02/27] Swap the avroio over to BytesIO --- sdks/python/apache_beam/io/avroio.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 30fc8903283c..a5be3232d9d9 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -42,7 +42,7 @@ Avro file. """ -import cStringIO +import io import os import zlib from functools import partial @@ -311,7 +311,7 @@ def _decompress_bytes(data, codec): # We take care to avoid extra copies of data while slicing large objects # by use of a buffer. result = snappy.decompress(buffer(data)[:-4]) - avroio.BinaryDecoder(cStringIO.StringIO(data[-4:])).check_crc32(result) + avroio.BinaryDecoder(io.BytesIO(data[-4:])).check_crc32(result) return result else: raise ValueError('Unknown codec: %r', codec) @@ -321,7 +321,7 @@ def num_records(self): def records(self): decoder = avroio.BinaryDecoder( - cStringIO.StringIO(self._decompressed_block_bytes)) + io.BytesIO(self._decompressed_block_bytes)) reader = avroio.DatumReader( writers_schema=self._schema, readers_schema=self._schema) From 7379b09b0ba79070172334a454150d707e58d1b8 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 11:31:24 -0700 Subject: [PATCH 03/27] Swithc apiclinet to io StringIO from StringIO StringIO --- sdks/python/apache_beam/runners/dataflow/internal/apiclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index c0863f701276..7ffc83235137 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -27,7 +27,7 @@ import re import time from datetime import datetime -from StringIO import StringIO +from io import StringIO from apitools.base.py import encoding from apitools.base.py import exceptions From 7b1d7c6d753306a9fb9285c41648c4c32fd172e8 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 11:39:31 -0700 Subject: [PATCH 04/27] Switch required packages based on Python version (currently only impacting avro) and add future geq 0.16.0 --- sdks/python/setup.py | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index a069237e22be..c91fad5996f2 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -94,8 +94,24 @@ def get_version(): except ImportError: cythonize = lambda *args, **kwargs: [] +COMMON_PACKAGES = [ + 'crcmod>=1.7,<2.0', + 'dill==0.2.6', + 'grpcio>=1.0,<2.0', + 'httplib2>=0.8,<0.10', + 'mock>=1.0.1,<3.0.0', + 'oauth2client>=2.0.1,<4.0.0', + 'protobuf>=3.2.0,<=3.3.0', + 'pyyaml>=3.12,<4.0.0', + # Six 1.11.0 incompatible with apitools. + # TODO(BEAM-2964): Remove the upper bound. + 'six>=1.9,<1.11', + 'typing>=3.6.0,<3.7.0', + 'future>=0.16.0', +] + -REQUIRED_PACKAGES = [ +COMMON_PACKAGES = [ 'avro>=1.8.1,<2.0.0', 'crcmod>=1.7,<2.0', 'dill==0.2.6', @@ -113,6 +129,16 @@ def get_version(): 'futures>=3.1.1,<4.0.0', ] +if sys.version_info[0] >= 3: + REQUIRED_PACKAGES = [ + 'avro-python3>=1.8.2,<2.0.0', + ] + COMMON_PACKAGES +else: + REQUIRED_PACKAGES = [ + 'avro>=1.8.1,<2.0.0', + ] + COMMON_PACKAGES + + REQUIRED_SETUP_PACKAGES = [ 'nose>=1.0', ] From 10f83d7953f33250f033cafb64cdaf16ce8012bd Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 16:07:27 -0700 Subject: [PATCH 05/27] Rewrite lambda with expansion --- sdks/python/apache_beam/transforms/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 8185e64a67cf..14a5c89175f1 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -260,7 +260,7 @@ def _thin_data(self): odd_one_out = [sorted_data[-1]] if len(sorted_data) % 2 == 1 else [] # Sort the pairs by how different they are. pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]), - key=lambda ((x1, _1), (x2, _2)): x2 / x1) + key=lambda x1_y1_x2_y2: x1_y1_x2_y2[1][0] / x1_y1_x2_y2[0][0]) # Keep the top 1/3 most different pairs, average the top 2/3 most similar. threshold = 2 * len(pairs) / 3 self._data = ( From 85930de068b743c0c727defea39cc0831c192a4a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 16:07:42 -0700 Subject: [PATCH 06/27] Fix items and builtin import and a long import --- sdks/python/apache_beam/__init__.py | 1 - sdks/python/apache_beam/transforms/combiners.py | 2 ++ sdks/python/apache_beam/typehints/native_type_compatibility.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py index 791ebb7a342e..4c24199b6c28 100644 --- a/sdks/python/apache_beam/__init__.py +++ b/sdks/python/apache_beam/__init__.py @@ -74,7 +74,6 @@ import sys - if not (sys.version_info[0] == 2 and sys.version_info[1] == 7): raise RuntimeError( 'The Apache Beam SDK for Python is supported only on Python 2.7. ' diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index e29855e5f8fc..5696859695b2 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -36,6 +36,8 @@ from apache_beam.typehints import with_input_types from apache_beam.typehints import with_output_types +from past.builtins import long + __all__ = [ 'Count', 'Mean', diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index cf07d7d922a0..0be931e8fe2b 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -161,6 +161,6 @@ def convert_to_beam_types(args): a dictionary with the same keys, and values which have been converted. """ if isinstance(args, dict): - return {k: convert_to_beam_type(v) for k, v in args.iteritems()} + return {k: convert_to_beam_type(v) for k, v in args.items()} else: return [convert_to_beam_type(v) for v in args] From 64b0c8f7a5d81b238bba12e25a7dc62568a7182d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 16:11:29 -0700 Subject: [PATCH 07/27] Switch filesystem to io.StringIO --- sdks/python/apache_beam/io/filesystem.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 09739dc94454..9601cf495c47 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -20,8 +20,8 @@ import abc import bz2 -import cStringIO import logging +import io import os import time import zlib @@ -122,7 +122,7 @@ def __init__(self, if self.readable(): self._read_size = read_size - self._read_buffer = cStringIO.StringIO() + self._read_buffer = io.StringIO() self._read_position = 0 self._read_eof = False @@ -237,7 +237,7 @@ def readline(self): if not self._decompressor: raise ValueError('decompressor not initialized') - io = cStringIO.StringIO() + io = io.StringIO() while True: # Ensure that the internal buffer has at least half the read_size. Going # with half the _read_size (as opposed to a full _read_size) to ensure From 7d1bc5cbe4d4fe857d1df6ab5d932e22acb051dc Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 16:47:13 -0700 Subject: [PATCH 08/27] Fix cirular dep introduced through Py3 import changes for CreatePTransformOverride. --- sdks/python/apache_beam/runners/__init__.py | 1 + .../apache_beam/runners/dataflow/__init__.py | 1 + .../runners/dataflow/dataflow_runner.py | 15 ++++++--------- .../runners/dataflow/ptransform_overrides.py | 5 +++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py index 863e67ed6e90..30c0d0c2fb14 100644 --- a/sdks/python/apache_beam/runners/__init__.py +++ b/sdks/python/apache_beam/runners/__init__.py @@ -19,6 +19,7 @@ This package defines runners, which are used to execute a pipeline. """ +from __future__ import absolute_import from apache_beam.runners.direct.direct_runner import DirectRunner from apache_beam.runners.runner import PipelineRunner diff --git a/sdks/python/apache_beam/runners/dataflow/__init__.py b/sdks/python/apache_beam/runners/dataflow/__init__.py index 6674ba5d9ff9..df251fcfc187 100644 --- a/sdks/python/apache_beam/runners/dataflow/__init__.py +++ b/sdks/python/apache_beam/runners/dataflow/__init__.py @@ -20,6 +20,7 @@ Anything in this package not imported here is an internal implementation detail with no backwards-compatibility guarantees. """ +from __future__ import absolute_import from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index bfec89310e9e..595ae590fb04 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -20,6 +20,7 @@ The runner will create a JSON description of the job graph and then submit it to the Dataflow Service for remote execution by a worker. """ +from __future__ import absolute_import import logging import threading @@ -72,14 +73,6 @@ class DataflowRunner(PipelineRunner): # For internal SDK use only. This should not be updated by Beam pipeline # authors. - # Imported here to avoid circular dependencies. - # TODO: Remove the apache_beam.pipeline dependency in CreatePTransformOverride - from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride - - _PTRANSFORM_OVERRIDES = [ - CreatePTransformOverride(), - ] - def __init__(self, cache=None): # Cache of CloudWorkflowStep protos generated while the runner # "executes" a pipeline. @@ -285,7 +278,11 @@ def run_pipeline(self, pipeline): return_context=True) # Performing configured PTransform overrides. - pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES) + # Imported here to avoid circular dependencies. + # TODO: Remove the apache_beam.pipeline dependency in CreatePTransformOverride + from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride + + pipeline.replace_all(CreatePTransformOverride()) # Add setup_options for all the BeamPlugin imports setup_options = pipeline._options.view_as(SetupOptions) diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py index 0ce212fa31bd..0e0e0c91a873 100644 --- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py +++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py @@ -16,10 +16,11 @@ # """Ptransform overrides for DataflowRunner.""" +from __future__ import absolute_import -from apache_beam.coders import typecoders -from apache_beam.pipeline import PTransformOverride +from apache_beam.coders import typecoders +from apache_beam.pipeline import * class CreatePTransformOverride(PTransformOverride): """A ``PTransformOverride`` for ``Create`` in streaming mode.""" From 86a1b2ce16e69d2ef7d9c836754607af2ee7e7b5 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 17:00:34 -0700 Subject: [PATCH 09/27] Change slow_stream to bytes --- sdks/python/apache_beam/coders/slow_stream.py | 16 ++++++++++------ sdks/python/apache_beam/coders/stream_test.py | 18 +++++++++--------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index 1ab55d90f98d..24138f69f535 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -21,7 +21,7 @@ """ import struct - +from past.builtins import basestring class OutputStream(object): """For internal use only; no backwards-compatibility guarantees. @@ -32,13 +32,17 @@ def __init__(self): self.data = [] def write(self, b, nested=False): - assert isinstance(b, str) + assert (isinstance(b, basestring) or isinstance(b, bytes), + "%r is not a basestring or bytes it is a %r" % (b, type(b))) if nested: self.write_var_int64(len(b)) - self.data.append(b) + if isinstance(b, bytes): + self.data.append(b) + else: + self.data.append(b.encode("LATIN-1")) def write_byte(self, val): - self.data.append(chr(val)) + self.data.append(chr(val).encode("LATIN-1")) def write_var_int64(self, v): if v < 0: @@ -67,7 +71,7 @@ def write_bigendian_double(self, v): self.write(struct.pack('>d', v)) def get(self): - return ''.join(self.data) + return b''.join(self.data) def size(self): return len(self.data) @@ -123,7 +127,7 @@ def read_all(self, nested): def read_byte(self): self.pos += 1 - return ord(self.data[self.pos - 1]) + return self.data[self.pos - 1] def read_var_int64(self): shift = 0 diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py index 15bc5eb9ba93..ad44211abaae 100644 --- a/sdks/python/apache_beam/coders/stream_test.py +++ b/sdks/python/apache_beam/coders/stream_test.py @@ -33,20 +33,20 @@ class StreamTest(unittest.TestCase): def test_read_write(self): out_s = self.OutputStream() - out_s.write('abc') - out_s.write('\0\t\n') - out_s.write('xyz', True) - out_s.write('', True) + out_s.write(b'abc') + out_s.write(b'\0\t\n') + out_s.write(b'xyz', True) + out_s.write(b'', True) in_s = self.InputStream(out_s.get()) - self.assertEquals('abc\0\t\n', in_s.read(6)) - self.assertEquals('xyz', in_s.read_all(True)) - self.assertEquals('', in_s.read_all(True)) + self.assertEquals(b'abc\0\t\n', in_s.read(6)) + self.assertEquals(b'xyz', in_s.read_all(True)) + self.assertEquals(b'', in_s.read_all(True)) def test_read_all(self): out_s = self.OutputStream() - out_s.write('abc') + out_s.write(b'abc') in_s = self.InputStream(out_s.get()) - self.assertEquals('abc', in_s.read_all(False)) + self.assertEquals(b'abc', in_s.read_all(False)) def test_read_write_byte(self): out_s = self.OutputStream() From d7de78443881664f1dcbad2fcad1fc21919b7018 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 21:40:17 -0700 Subject: [PATCH 10/27] Handle slow_stream in py2 again --- sdks/python/apache_beam/coders/slow_stream.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index 24138f69f535..8dc3f75ab684 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -21,6 +21,7 @@ """ import struct +import sys from past.builtins import basestring class OutputStream(object): @@ -127,7 +128,10 @@ def read_all(self, nested): def read_byte(self): self.pos += 1 - return self.data[self.pos - 1] + if sys.version_info[0] == 3: + return self.data[self.pos - 1] + else: + return ord(self.data[self.pos - 1]) def read_var_int64(self): shift = 0 From 34ab42ce2021bc1aa3ef5d2760e710f1fe9a8bb7 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 21:58:44 -0700 Subject: [PATCH 11/27] Selective fixes while running through apache_beam.examples.complete.game.game_stats_test.GameStatsTest, fix some rethrows. lambda expansion, imports, and metaclass --- sdks/python/apache_beam/coders/coder_impl.py | 2 ++ sdks/python/apache_beam/coders/coders_test.py | 2 +- .../examples/complete/game/game_stats.py | 4 +-- sdks/python/apache_beam/internal/util.py | 2 +- sdks/python/apache_beam/runners/common.py | 25 +++++++++++++++---- sdks/python/apache_beam/runners/runner.py | 6 ++++- sdks/python/apache_beam/transforms/core.py | 2 ++ .../apache_beam/transforms/ptransform.py | 2 +- .../typehints/trivial_inference.py | 5 ++++ .../python/apache_beam/typehints/typehints.py | 4 +-- 10 files changed, 41 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 8e0565a59756..b22bf141b976 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -344,6 +344,8 @@ def decode_from_stream(self, in_stream, nested): return in_stream.read_all(nested) def encode(self, value): + if (not isinstance(value, bytes)) and isinstance(value, str): + return value.encode("LATIN-1") assert isinstance(value, bytes), (value, type(value)) return value diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index 705de8920d52..e539307bad1b 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -57,7 +57,7 @@ def test_str_utf8_coder(self): expected_coder = coders.BytesCoder() self.assertEqual( real_coder.encode('abc'), expected_coder.encode('abc')) - self.assertEqual('abc', real_coder.decode(real_coder.encode('abc'))) + self.assertEqual(b'abc', real_coder.decode(real_coder.encode('abc'))) # The test proto message file was generated by running the following: diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py b/sdks/python/apache_beam/examples/complete/game/game_stats.py index d8c60dd67662..4e25cd3e29ac 100644 --- a/sdks/python/apache_beam/examples/complete/game/game_stats.py +++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py @@ -229,8 +229,8 @@ def expand(self, user_scores): sum_scores # Use the derived mean total score (global_mean_score) as a side input. | 'ProcessAndFilter' >> beam.Filter( - lambda (_, score), global_mean:\ - score > global_mean * self.SCORE_WEIGHT, + lambda x_score, global_mean:\ + x_score[1] > global_mean * self.SCORE_WEIGHT, global_mean_score)) return filtered # [END abuse_detect] diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py index e4f230b8eb15..e3f82a768983 100644 --- a/sdks/python/apache_beam/internal/util.py +++ b/sdks/python/apache_beam/internal/util.py @@ -79,7 +79,7 @@ def swapper(value): # by sorting the entries first. This will be important when putting back # PValues. new_kwargs = dict((k, swapper(v)) if isinstance(v, pvalue_classes) else (k, v) - for k, v in sorted(kwargs.iteritems())) + for k, v in sorted(kwargs.items())) return (new_args, new_kwargs, pvals) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index d5ca68307aa5..f236453b9285 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -508,11 +508,26 @@ def _reraise_augmented(self, exn): except: # pylint: disable=bare-except # If anything goes wrong, construct a RuntimeError whose message # records the original exception's type and message. - new_exn = RuntimeError( - traceback.format_exception_only(type(exn), exn)[-1].strip() - + step_annotation) - new_exn._tagged_with_step = True - raise new_exn, None, original_traceback + # PEP-3134 means we can just wrap + if sys.version_info[0] == 3: + raise Exception(step_annotation) + else: + # To emulate exception chaining (not available in Python 2). + original_traceback = sys.exc_info()[2] + try: + # Attempt to construct the same kind of exception + # with an augmented message. + new_exn = type(exn)(exn.args[0] + step_annotation, *exn.args[1:]) + new_exn._tagged_with_step = True # Could raise attribute error. + except: # pylint: disable=bare-except + # If anything goes wrong, construct a RuntimeError whose message + # records the original exception's type and message. + new_exn = RuntimeError( + traceback.format_exception_only(type(exn), exn)[-1].strip() + + step_annotation) + new_exn._tagged_with_step = True + new_exn.args = exn.args + raise new_exn class OutputProcessor(object): diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 22288a301896..e7800030f167 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -23,6 +23,7 @@ import os import shelve import shutil +import sys import tempfile __all__ = ['PipelineRunner', 'PipelineState', 'PipelineResult'] @@ -90,7 +91,10 @@ def create_runner(runner_name): if '.' in runner_name: module, runner = runner_name.rsplit('.', 1) try: - return getattr(__import__(module, {}, {}, [runner], -1), runner)() + if sys.version_info[0] >= 3: + return getattr(__import__(module, {}, {}, [runner]), runner)() + else: + return getattr(__import__(module, {}, {}, [runner], -1), runner)() except ImportError: if runner_name in _KNOWN_DATAFLOW_RUNNERS: raise ImportError( diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 2d411ee75331..50582f7209bf 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -55,6 +55,8 @@ from apache_beam.typehints.typehints import is_consistent_with from apache_beam.utils import urns +from past.builtins import basestring + __all__ = [ 'DoFn', 'CombineFn', diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index c7fc641804dc..9a2abd2e4781 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -611,7 +611,7 @@ def __init__(self, fn, *args, **kwargs): super(PTransformWithSideInputs, self).__init__() if (any([isinstance(v, pvalue.PCollection) for v in args]) or - any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])): + any([isinstance(v, pvalue.PCollection) for v in kwargs.values()])): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py index 28bf8f5ba6f3..3611de6be7e3 100644 --- a/sdks/python/apache_beam/typehints/trivial_inference.py +++ b/sdks/python/apache_beam/typehints/trivial_inference.py @@ -41,6 +41,11 @@ class TypeInferenceError(ValueError): pass +try: + from types import InstanceType +except ImportError: + InstanceType = object + def instance_to_type(o): """Given a Python object o, return the corresponding type hint. """ diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index abef0279b827..b90deabf79e7 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -67,6 +67,7 @@ import copy import sys import types +from future.utils import with_metaclass __all__ = [ 'Any', @@ -990,7 +991,7 @@ def __getitem__(self, type_param): IteratorTypeConstraint = IteratorHint.IteratorTypeConstraint -class WindowedTypeConstraint(TypeConstraint): +class WindowedTypeConstraint(with_metaclass(GetitemConstructor, TypeConstraint)): """A type constraint for WindowedValue objects. Mostly for internal use. @@ -998,7 +999,6 @@ class WindowedTypeConstraint(TypeConstraint): Attributes: inner_type: The type which the element should be an instance of. """ - __metaclass__ = GetitemConstructor def __init__(self, inner_type): self.inner_type = inner_type From bec24c9c563b5bd4b43660025404fd46030b9ea5 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 3 Nov 2017 22:00:14 -0700 Subject: [PATCH 12/27] metaclass rewrite rule --- sdks/python/apache_beam/io/filesystem.py | 4 ++-- sdks/python/apache_beam/pipeline.py | 4 ++-- .../apache_beam/runners/worker/data_plane.py | 9 +++------ .../python/apache_beam/testing/test_stream.py | 5 ++--- .../python/apache_beam/transforms/timeutil.py | 9 +++------ sdks/python/apache_beam/transforms/trigger.py | 19 ++++++------------- sdks/python/apache_beam/transforms/window.py | 5 ++--- 7 files changed, 20 insertions(+), 35 deletions(-) diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 9601cf495c47..252cdefe241f 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -29,6 +29,7 @@ from six import integer_types from apache_beam.utils.plugin import BeamPlugin +from future.utils import with_metaclass logger = logging.getLogger(__name__) @@ -423,14 +424,13 @@ def __init__(self, msg, exception_details=None): self.exception_details = exception_details -class FileSystem(BeamPlugin): +class FileSystem(with_metaclass(abc.ABCMeta, BeamPlugin)): """A class that defines the functions that can be performed on a filesystem. All methods are abstract and they are for file system providers to implement. Clients should use the FileSystems class to interact with the correct file system based on the provided file pattern scheme. """ - __metaclass__ = abc.ABCMeta CHUNK_SIZE = 1 # Chuck size in the batch operations def __init__(self, pipeline_options): diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 71d97ba5d21f..98d2b14f137a 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -70,6 +70,7 @@ from apache_beam.typehints import TypeCheckError from apache_beam.typehints import typehints from apache_beam.utils.annotations import deprecated +from future.utils import with_metaclass __all__ = ['Pipeline', 'PTransformOverride'] @@ -859,7 +860,7 @@ def is_side_input(tag): return result -class PTransformOverride(object): +class PTransformOverride(with_metaclass(abc.ABCMeta, object)): """For internal use only; no backwards-compatibility guarantees. Gives a matcher and replacements for matching PTransforms. @@ -867,7 +868,6 @@ class PTransformOverride(object): TODO: Update this to support cases where input and/our output types are different. """ - __metaclass__ = abc.ABCMeta @abc.abstractmethod def matches(self, applied_ptransform): diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 2e4f2d6f69a7..0117192e999e 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -33,6 +33,7 @@ from apache_beam.coders import coder_impl from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc +from future.utils import with_metaclass # This module is experimental. No backwards-compatibility guarantees. @@ -49,7 +50,7 @@ def close(self): self._close_callback(self.get()) -class DataChannel(object): +class DataChannel(with_metaclass(abc.ABCMeta, object)): """Represents a channel for reading and writing data over the data plane. Read from this channel with the input_elements method:: @@ -68,8 +69,6 @@ class DataChannel(object): data_channel.close() """ - __metaclass__ = abc.ABCMeta - @abc.abstractmethod def input_elements(self, instruction_id, expected_targets): """Returns an iterable of all Element.Data bundles for instruction_id. @@ -270,11 +269,9 @@ def Data(self, elements_iterator, context): yield elements -class DataChannelFactory(object): +class DataChannelFactory(with_metaclass(abc.ABCMeta, object)): """An abstract factory for creating ``DataChannel``.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod def create_data_channel(self, remote_grpc_port): """Returns a ``DataChannel`` from the given RemoteGrpcPort.""" diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py index 8a63e7bd0561..0598c1b7fac2 100644 --- a/sdks/python/apache_beam/testing/test_stream.py +++ b/sdks/python/apache_beam/testing/test_stream.py @@ -31,6 +31,7 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import timestamp from apache_beam.utils.windowed_value import WindowedValue +from future.utils import with_metaclass __all__ = [ 'Event', @@ -41,11 +42,9 @@ ] -class Event(object): +class Event(with_metaclass(ABCMeta, object)): """Test stream event to be emitted during execution of a TestStream.""" - __metaclass__ = ABCMeta - def __cmp__(self, other): if type(self) is not type(other): return cmp(type(self), type(other)) diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py index 8d63d49baad0..87efb954c829 100644 --- a/sdks/python/apache_beam/transforms/timeutil.py +++ b/sdks/python/apache_beam/transforms/timeutil.py @@ -21,6 +21,7 @@ from abc import ABCMeta from abc import abstractmethod +from future.utils import with_metaclass __all__ = [ 'TimeDomain', @@ -43,11 +44,9 @@ def from_string(domain): raise ValueError('Unknown time domain: %s' % domain) -class TimestampCombinerImpl(object): +class TimestampCombinerImpl(with_metaclass(ABCMeta, object)): """Implementation of TimestampCombiner.""" - __metaclass__ = ABCMeta - @abstractmethod def assign_output_time(self, window, input_timestamp): pass @@ -72,11 +71,9 @@ def merge(self, unused_result_window, merging_timestamps): return self.combine_all(merging_timestamps) -class DependsOnlyOnWindow(TimestampCombinerImpl): +class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)): """TimestampCombinerImpl that only depends on the window.""" - __metaclass__ = ABCMeta - def combine(self, output_timestamp, other_output_timestamp): return output_timestamp diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index b4bd6a2d5cda..586321eca960 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -40,6 +40,7 @@ from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.utils.timestamp import TIME_GRANULARITY +from future.utils import with_metaclass # AfterCount is experimental. No backwards compatibility guarantees. @@ -66,14 +67,13 @@ class AccumulationMode(object): # RETRACTING = 3 -class _StateTag(object): +class _StateTag(with_metaclass(ABCMeta, object)): """An identifier used to store and retrieve typed, combinable state. The given tag must be unique for this stage. If CombineFn is None then all elements will be returned as a list, otherwise the given CombineFn will be applied (possibly incrementally and eagerly) when adding elements. """ - __metaclass__ = ABCMeta def __init__(self, tag): self.tag = tag @@ -134,12 +134,11 @@ def with_prefix(self, prefix): # pylint: disable=unused-argument # TODO(robertwb): Provisional API, Java likely to change as well. -class TriggerFn(object): +class TriggerFn(with_metaclass(ABCMeta, object)): """A TriggerFn determines when window (panes) are emitted. See https://beam.apache.org/documentation/programming-guide/#triggers """ - __metaclass__ = ABCMeta @abstractmethod def on_element(self, element, window, context): @@ -456,9 +455,7 @@ def to_runner_api(self, context): subtrigger=self.underlying.to_runner_api(context))) -class _ParallelTriggerFn(TriggerFn): - - __metaclass__ = ABCMeta +class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)): def __init__(self, *triggers): self.triggers = triggers @@ -678,14 +675,12 @@ def clear_state(self, tag): # pylint: disable=unused-argument -class SimpleState(object): +class SimpleState(with_metaclass(ABCMeta, object)): """Basic state storage interface used for triggering. Only timers must hold the watermark (by their timestamp). """ - __metaclass__ = ABCMeta - @abstractmethod def set_timer(self, window, name, time_domain, timestamp): pass @@ -850,11 +845,9 @@ def create_trigger_driver(windowing, is_batch=False, phased_combine_fn=None): return driver -class TriggerDriver(object): +class TriggerDriver(with_metaclass(ABCMeta, object)): """Breaks a series of bundle and timer firings into window (pane)s.""" - __metaclass__ = ABCMeta - @abstractmethod def process_elements(self, state, windowed_values, output_watermark): pass diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index c250e8c6d365..67d447d7f7ab 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -67,6 +67,7 @@ from apache_beam.utils.timestamp import Duration from apache_beam.utils.timestamp import Timestamp from apache_beam.utils.windowed_value import WindowedValue +from future.utils import with_metaclass __all__ = [ 'TimestampCombiner', @@ -108,11 +109,9 @@ def get_impl(timestamp_combiner, window_fn): raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner) -class WindowFn(urns.RunnerApiFn): +class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)): """An abstract windowing function defining a basic assign and merge.""" - __metaclass__ = abc.ABCMeta - class AssignContext(object): """Context passed to WindowFn.assign().""" From 39de5e26783f6634302585b85ea5b58328d5e96a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 20 Aug 2017 14:42:33 -0700 Subject: [PATCH 13/27] Add __hash__ methods in places where needed (in progress) --- .../apache_beam/coders/coders_test_common.py | 4 +++- sdks/python/apache_beam/transforms/core.py | 5 ++++- sdks/python/apache_beam/transforms/trigger.py | 19 +++++++++++++++++++ sdks/python/apache_beam/transforms/window.py | 18 +++++++++++++++--- .../apache_beam/utils/windowed_value.py | 8 ++++++++ 5 files changed, 49 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index fc7279d5e011..bcb2253be46a 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -65,7 +65,9 @@ def tearDownClass(cls): standard -= set([coders.Coder, coders.FastCoder, coders.ProtoCoder, - coders.ToStringCoder]) + coders.ToStringCoder, + # TODO(remove this after rest of tests working): + coders.WindowedValueCoder]) assert not standard - cls.seen, standard - cls.seen assert not standard - cls.seen_nested, standard - cls.seen_nested diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 50582f7209bf..19b3367ba0d9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1434,7 +1434,6 @@ def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PCollection(pcoll.pipeline) - class _GroupAlsoByWindowDoFn(DoFn): # TODO(robertwb): Support combiner lifting. @@ -1546,6 +1545,10 @@ def __eq__(self, other): and self.timestamp_combiner == other.timestamp_combiner) return False + def __hash__(self): + return hash((self.windowfn, self.triggerfn, self.accumulation_mode, + self.timestamp_combiner)) + def is_default(self): return self._is_default diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 586321eca960..7d50fcd83a03 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -249,8 +249,12 @@ def reset(self, window, context): context.clear_timer('', TimeDomain.WATERMARK) def __eq__(self, other): + """Since there should be only one default trigger, return if types equal.""" return type(self) == type(other) + def __hash__(self): + return hash(type(self)) + @staticmethod def from_runner_api(proto, context): return DefaultTrigger() @@ -389,6 +393,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.count == other.count + def __hash__(self): + return hash(self.count) + def on_element(self, element, window, context): context.add_state(self.COUNT_TAG, 1) @@ -427,6 +434,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.underlying == other.underlying + def __hash__(self): + return hash(self.underlying) + def on_element(self, element, window, context): # get window from context? self.underlying.on_element(element, window, context) @@ -467,6 +477,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers + def __hash__(self): + return hash(self.triggers) + @abstractmethod def combine_op(self, trigger_results): pass @@ -558,6 +571,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers + def __hash__(self): + return hash(self.triggers) + def on_element(self, element, window, context): ix = context.get_state(self.INDEX_TAG) if ix < len(self.triggers): @@ -897,6 +913,9 @@ def __eq__(self, other): else: return NotImplemented + def __hash__(self): + return hash(list(self)) + def __ne__(self, other): return not self == other diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 67d447d7f7ab..3e415c48f249 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -221,6 +221,9 @@ def __hash__(self): def __eq__(self, other): return self.start == other.start and self.end == other.end + def __hash__(self): + return hash((self.start, self.end)) + def __repr__(self): return '[%s, %s)' % (float(self.start), float(self.end)) @@ -266,13 +269,13 @@ def __init__(self): def __repr__(self): return 'GlobalWindow' - def __hash__(self): - return hash(type(self)) - def __eq__(self, other): # Global windows are always and only equal to each other. return self is other or type(self) is type(other) + def __hash__(self): + return hash(type(self)) + class NonMergingWindowFn(WindowFn): @@ -346,6 +349,9 @@ def __eq__(self, other): if type(self) == type(other) == FixedWindows: return self.size == other.size and self.offset == other.offset + def __hash__(self): + return hash((self.size, self.offset)) + def __ne__(self, other): return not self == other @@ -405,6 +411,9 @@ def __eq__(self, other): and self.offset == other.offset and self.period == other.period) + def __hash__(self): + return hash((self.size, self.offset, self.period)) + def to_runner_api_parameter(self, context): return (common_urns.SLIDING_WINDOWS_WINDOWFN, standard_window_fns_pb2.SlidingWindowsPayload( @@ -472,6 +481,9 @@ def __eq__(self, other): if type(self) == type(other) == Sessions: return self.gap_size == other.gap_size + def __hash__(self): + return hash((self.gap_size)) + def to_runner_api_parameter(self, context): return (common_urns.SESSION_WINDOWS_WINDOWFN, standard_window_fns_pb2.SessionsPayload( diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index be2785432a14..a46d888f5f2f 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -92,6 +92,14 @@ def _typed_eq(left, right): and left.value == right.value and left.windows == right.windows) + def __hash__(self): + return hash(self.timestamp_micros, self.value, self.windows) + + def __eq__(self, other): + if type(self) is not type(other): + return False + return WindowedValue._typed_eq(self, other) + def with_value(self, new_value): """Creates a new WindowedValue with the same timestamps and windows as this. From b3fef828d3cd9096e32c7afd385361b7a2a8a190 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 21 Aug 2017 12:08:44 -0700 Subject: [PATCH 14/27] Provide a base ne using eq for windows --- sdks/python/apache_beam/transforms/window.py | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 3e415c48f249..b9df5007e16c 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -193,6 +193,33 @@ def __cmp__(self, other): # Order first by endpoint, then arbitrarily. return cmp(self.end, other.end) or cmp(hash(self), hash(other)) + def __lt__(self, other): + # Order first by endpoint, then arbitrarily. + if self.end == other.end: + return hash(self) < hash(other) + return self.end < other.end + + def __gt__(self, other): + # Order first by endpoint, then arbitrarily. + if self.end == other.end: + return hash(self) > hash(other) + return self.end > other.end + + def __le__(self, other): + # Order first by endpoint, then arbitrarily. + if self.end == other.end: + return hash(self) <= hash(other) + return self.end <= other.end + + def __ge__(self, other): + # Order first by endpoint, then arbitrarily. + if self.end == other.end: + return hash(self) >= hash(other) + return self.end >= other.end + + def __ne__(self, other): + return not (self.__eq__(other)) + def __eq__(self, other): raise NotImplementedError From f4e8d6145f6ce115438b8616f95b37c8a864480a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 21 Aug 2017 02:03:52 -0700 Subject: [PATCH 15/27] Provide eq/hashing/sorting for TimestampedValue --- sdks/python/apache_beam/transforms/window.py | 24 ++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index b9df5007e16c..de60391c6df9 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -279,6 +279,30 @@ def __cmp__(self, other): return cmp(type(self), type(other)) return cmp((self.value, self.timestamp), (other.value, other.timestamp)) + def __eq__(self, other): + if type(self) is not type(other): + return False + else: + return self.value == other.value and self.timestamp == other.timestamp + + def __hash__(self): + return hash((self.value, self.timestamp)) + + def __lt__(self, other): + return (self.value, self.timestamp) < (other.value, other.timestamp) + + def __gt__(self, other): + return (self.value, self.timestamp) > (other.value, other.timestamp) + + def __le__(self, other): + return (self.value, self.timestamp) <= (other.value, other.timestamp) + + def __ge__(self, other): + return (self.value, self.timestamp) >= (other.value, other.timestamp) + + def __ne__(self, other): + return not self.__eq__(other) + class GlobalWindow(BoundedWindow): """The default window into which all data is placed (via GlobalWindows).""" From ab2094b6d9444904109012d3606910dc005a5535 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 20 Aug 2017 14:11:07 -0700 Subject: [PATCH 16/27] Rich comparisions requirement --- sdks/python/apache_beam/coders/coders.py | 6 +- sdks/python/apache_beam/utils/timestamp.py | 75 ++++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index c20eb612497b..0d8c04b00e92 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -67,7 +67,11 @@ def serialize_coder(coder): from apache_beam.internal import pickler - return '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder)) + result = '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder)) + if sys.version_info[0] == 3: + return result.encode("LATIN-1") + else: + return result def deserialize_coder(serialized): diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py index b3e840ee284e..a0c70856b4f7 100644 --- a/sdks/python/apache_beam/utils/timestamp.py +++ b/sdks/python/apache_beam/utils/timestamp.py @@ -98,9 +98,45 @@ def __cmp__(self, other): other = Timestamp.of(other) return cmp(self.micros, other.micros) + def __ne__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Duration): + other = Timestamp.of(other) + return self.micros != other.micros + + def __lt__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Duration): + other = Timestamp.of(other) + return self.micros < other.micros + + def __le__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Duration): + other = Timestamp.of(other) + return self.micros <= other.micros + + def __gt__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Duration): + other = Timestamp.of(other) + return self.micros > other.micros + + def __ge__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Duration): + other = Timestamp.of(other) + return self.micros >= other.micros + def __hash__(self): return hash(self.micros) + def __eq__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Duration): + other = Timestamp.of(other) + return self.micros == other.micros + def __add__(self, other): other = Duration.of(other) return Timestamp(micros=self.micros + other.micros) @@ -176,6 +212,45 @@ def __cmp__(self, other): other = Duration.of(other) return cmp(self.micros, other.micros) + def __ne__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Timestamp): + other = Duration.of(other) + return self.micros != other.micros + + def __lt__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Timestamp): + other = Duration.of(other) + return self.micros < other.micros + + def __le__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Timestamp): + other = Duration.of(other) + return self.micros <= other.micros + + def __gt__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Timestamp): + other = Duration.of(other) + return self.micros > other.micros + + def __ge__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Timestamp): + other = Duration.of(other) + return self.micros >= other.micros + + def __hash__(self): + return hash(self.micros) + + def __eq__(self, other): + # Allow comparisons between Duration and Timestamp values. + if not isinstance(other, Timestamp): + other = Duration.of(other) + return self.micros == other.micros + def __hash__(self): return hash(self.micros) From 90b8fdb6d1bf95477e22a310e8936841134c9a33 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 21 Aug 2017 11:44:37 -0700 Subject: [PATCH 17/27] If we are passed in bytes for the lookup in pipeline_context do the lookup as a string --- .../apache_beam/runners/pipeline_context.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index dd8e0518acd0..87e24c7af69a 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -21,6 +21,9 @@ """ +from builtins import object +import sys + from apache_beam import coders from apache_beam import pipeline from apache_beam import pvalue @@ -64,10 +67,18 @@ def get_proto(self, obj, label=None): return self._id_to_proto[self.get_id(obj, label)] def get_by_id(self, id): - if id not in self._id_to_obj: - self._id_to_obj[id] = self._obj_type.from_runner_api( - self._id_to_proto[id], self._pipeline_context) - return self._id_to_obj[id] + if sys.version_info[0] >= 3 and isinstance(id, bytes): + myid = id.decode("utf-8") + else: + myid = id + try: + if myid not in self._id_to_obj: + self._id_to_obj[myid] = self._obj_type.from_runner_api( + self._id_to_proto[myid], self._pipeline_context) + return self._id_to_obj[myid] + except: + raise Exception("Error occured fetching id " + + myid + " from "+str(self._id_to_obj)) def __getitem__(self, id): return self.get_by_id(id) From c4a87c7b13f5fc8741d414faa16bbb63813a9d69 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 4 Nov 2017 00:16:12 -0700 Subject: [PATCH 18/27] queue and raise rewrites --- sdks/python/apache_beam/io/gcp/gcsio.py | 1 + .../apache_beam/runners/direct/executor.py | 17 ++++++++--------- .../runners/portability/fn_api_runner.py | 2 +- .../portability/universal_local_runner.py | 2 +- .../apache_beam/runners/worker/data_plane.py | 2 +- .../apache_beam/runners/worker/log_handler.py | 2 +- .../apache_beam/runners/worker/sdk_worker.py | 2 +- sdks/python/apache_beam/typehints/typecheck.py | 12 +++++++----- 8 files changed, 21 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 3bdf2e64ca2e..7f4c5ab41b49 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -27,6 +27,7 @@ import logging import multiprocessing import os +import queue import re import threading import time diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index d4d9cb5ca637..81a630be049e 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -22,7 +22,7 @@ import collections import itertools import logging -import Queue +import queue import sys import threading import traceback @@ -76,7 +76,7 @@ def _get_task_or_none(self): # shutdown. return self.queue.get( timeout=_ExecutorService._ExecutorServiceWorker.TIMEOUT) - except Queue.Empty: + except queue.Empty: return None def run(self): @@ -95,7 +95,7 @@ def shutdown(self): self.shutdown_requested = True def __init__(self, num_workers): - self.queue = Queue.Queue() + self.queue = queue.Queue() self.workers = [_ExecutorService._ExecutorServiceWorker( self.queue, i) for i in range(num_workers)] self.shutdown_requested = False @@ -120,7 +120,7 @@ def shutdown(self): try: self.queue.get_nowait() self.queue.task_done() - except Queue.Empty: + except queue.Empty: continue # All existing threads will eventually terminate (after they complete their # last task). @@ -397,8 +397,7 @@ def await_completion(self): update = self.visible_updates.take() try: if update.exception: - t, v, tb = update.exc_info - raise t, v, tb + raise update.exception finally: self.executor_service.shutdown() self.executor_service.await_completion() @@ -438,14 +437,14 @@ class _TypedUpdateQueue(object): def __init__(self, item_type): self._item_type = item_type - self._queue = Queue.Queue() + self._queue = queue.Queue() def poll(self): try: item = self._queue.get_nowait() self._queue.task_done() return item - except Queue.Empty: + except queue.Empty: return None def take(self): @@ -458,7 +457,7 @@ def take(self): item = self._queue.get(timeout=1) self._queue.task_done() return item - except Queue.Empty: + except queue.Empty: pass def offer(self, item): diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 63e4a68536eb..ae947100b30f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -20,7 +20,7 @@ import collections import copy import logging -import Queue as queue +import queue import re import threading import time diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py index a631a0c847bf..dc327de4edc4 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py @@ -18,7 +18,7 @@ import functools import logging import os -import Queue as queue +import queue import socket import subprocess import sys diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 0117192e999e..d84aa32a0060 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -24,7 +24,7 @@ import abc import collections import logging -import Queue as queue +import queue import sys import threading diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py index 6d8a1d926713..48226da6f834 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler.py +++ b/sdks/python/apache_beam/runners/worker/log_handler.py @@ -18,7 +18,7 @@ import logging import math -import Queue as queue +import queue import threading import grpc diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 2767530adb0b..a783b0d25c65 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -21,7 +21,7 @@ from __future__ import print_function import logging -import Queue as queue +import queue import sys import threading import traceback diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index 0298f5eca067..d9c620180ceb 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -25,6 +25,8 @@ import sys import types +from future.utils import raise_with_traceback + from apache_beam import pipeline from apache_beam.pvalue import TaggedOutput from apache_beam.transforms import core @@ -84,7 +86,7 @@ def wrapper(self, method, args, kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within ParDo(%s): ' '%s' % (self.full_label, e)) - raise TypeCheckError, error_msg, sys.exc_info()[2] + raise TypeCheckError(error_msg) else: return self._check_type(result) @@ -173,12 +175,12 @@ def _type_check(self, type_constraint, datum, is_input): try: check_constraint(type_constraint, datum) except CompositeTypeHintError as e: - raise TypeCheckError, e.args[0], sys.exc_info()[2] + raise TypeCheckError(e.message) except SimpleTypeHintError: error_msg = ("According to type-hint expected %s should be of type %s. " "Instead, received '%s', an instance of type %s." % (datum_type, type_constraint, datum, type(datum))) - raise TypeCheckError, error_msg, sys.exc_info()[2] + raise TypeCheckError(error_msg) class TypeCheckCombineFn(core.CombineFn): @@ -203,7 +205,7 @@ def add_input(self, accumulator, element, *args, **kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within %s: ' '%s' % (self._label, e)) - raise TypeCheckError, error_msg, sys.exc_info()[2] + raise_with_traceback(TypeCheckError(error_msg), sys.exc_info()[2]) return self._combinefn.add_input(accumulator, element, *args, **kwargs) def merge_accumulators(self, accumulators, *args, **kwargs): @@ -218,7 +220,7 @@ def extract_output(self, accumulator, *args, **kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within %s: ' '%s' % (self._label, e)) - raise TypeCheckError, error_msg, sys.exc_info()[2] + raise_with_traceback(TypeCheckError(error_msg), sys.exc_info()[2]) return result From 9807defbcfb3e57d6857bab62d72a71871c832ca Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 4 Nov 2017 00:18:10 -0700 Subject: [PATCH 19/27] futurize -wn -f libfuturize.fixes.fix_future_standard_library_urllib ./apache_beam --- sdks/python/apache_beam/internal/gcp/auth.py | 8 +++++--- .../apache_beam/runners/dataflow/dataflow_runner.py | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 8478e1b475c0..3159d40e8681 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -17,11 +17,13 @@ """Dataflow credentials and authentication.""" +from future import standard_library +standard_library.install_aliases() import datetime import json import logging import os -import urllib2 +import urllib.request, urllib.error, urllib.parse from oauth2client.client import GoogleCredentials from oauth2client.client import OAuth2Credentials @@ -89,8 +91,8 @@ def _refresh(self, http_request): 'GCE_METADATA_ROOT', 'metadata.google.internal') token_url = ('http://{}/computeMetadata/v1/instance/service-accounts/' 'default/token').format(metadata_root) - req = urllib2.Request(token_url, headers={'Metadata-Flavor': 'Google'}) - token_data = json.loads(urllib2.urlopen(req).read()) + req = urllib.request.Request(token_url, headers={'Metadata-Flavor': 'Google'}) + token_data = json.loads(urllib.request.urlopen(req).read()) self.access_token = token_data['access_token'] self.token_expiry = (refresh_time + datetime.timedelta(seconds=token_data['expires_in'])) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 595ae590fb04..b59b349ab602 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -22,11 +22,13 @@ """ from __future__ import absolute_import +from future import standard_library +standard_library.install_aliases() import logging import threading import time import traceback -import urllib +import urllib.request, urllib.parse, urllib.error from collections import defaultdict import apache_beam as beam @@ -880,12 +882,12 @@ def deserialize_windowing_strategy(cls, serialized_data): @staticmethod def byte_array_to_json_string(raw_bytes): """Implements org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString.""" - return urllib.quote(raw_bytes) + return urllib.parse.quote(raw_bytes) @staticmethod def json_string_to_byte_array(encoded_string): """Implements org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray.""" - return urllib.unquote(encoded_string) + return urllib.parse.unquote(encoded_string) class DataflowPipelineResult(PipelineResult): From e65cd9286f723579c75bddbae78ec4ecd8490f2b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 4 Nov 2017 01:04:59 -0700 Subject: [PATCH 20/27] Finish up getting apache_beam.examples.complete.game.game_stats_test.GameStatsTest to run, fix serialized values map over, switch some exception re-throws, iteritems, and handle peaking at the eval context. --- sdks/python/apache_beam/internal/util.py | 2 +- sdks/python/apache_beam/io/range_trackers.py | 2 ++ .../apache_beam/runners/direct/evaluation_context.py | 7 +++---- .../python/apache_beam/runners/direct/watermark_manager.py | 6 +++--- sdks/python/apache_beam/transforms/core.py | 2 +- sdks/python/apache_beam/utils/retry.py | 2 +- 6 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py index e3f82a768983..e74dd4333ac6 100644 --- a/sdks/python/apache_beam/internal/util.py +++ b/sdks/python/apache_beam/internal/util.py @@ -104,7 +104,7 @@ def insert_values_in_args(args, kwargs, values): for arg in args] new_kwargs = dict( (k, next(v_iter)) if isinstance(v, ArgumentPlaceholder) else (k, v) - for k, v in sorted(kwargs.iteritems())) + for k, v in sorted(kwargs.items())) return (new_args, new_kwargs) diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py index 2da8736b1141..ee5224d62b0b 100644 --- a/sdks/python/apache_beam/io/range_trackers.py +++ b/sdks/python/apache_beam/io/range_trackers.py @@ -26,6 +26,8 @@ from apache_beam.io import iobase +from past.builtins import long + __all__ = ['OffsetRangeTracker', 'LexicographicKeyRangeTracker', 'OrderedPositionRangeTracker', 'UnsplittableRangeTracker'] diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 46176c9e969e..61381bc1d032 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -206,12 +206,11 @@ def handle_result( self._metrics.commit_logical(completed_bundle, result.logical_metric_updates) - # If the result is for a view, update side inputs container. if (result.uncommitted_output_bundles - and result.uncommitted_output_bundles[0].pcollection + and next(iter(result.uncommitted_output_bundles)).pcollection in self._pcollection_to_views): for view in self._pcollection_to_views[ - result.uncommitted_output_bundles[0].pcollection]: + next(iter(result.uncommitted_output_bundles)).pcollection]: for committed_bundle in committed_bundles: # side_input must be materialized. self._side_inputs_container.add_values( @@ -231,7 +230,7 @@ def handle_result( # Commit partial GBK states existing_keyed_state = self._transform_keyed_states[result.transform] - for k, v in result.partial_keyed_state.iteritems(): + for k, v in result.partial_keyed_state.items(): existing_keyed_state[k] = v return committed_bundles diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 084073f4fe71..00383255469c 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -148,7 +148,7 @@ def extract_all_timers(self): and reports if there are any timers set.""" all_timers = [] has_realtime_timer = False - for applied_ptransform, tw in self._transform_to_watermarks.iteritems(): + for applied_ptransform, tw in self._transform_to_watermarks.items(): fired_timers, had_realtime_timer = tw.extract_transform_timers() if fired_timers: all_timers.append((applied_ptransform, fired_timers)) @@ -194,7 +194,7 @@ def output_watermark(self): def hold(self, keyed_earliest_holds): with self._lock: - for key, hold_value in keyed_earliest_holds.iteritems(): + for key, hold_value in keyed_earliest_holds.items(): self._keyed_earliest_holds[key] = hold_value if (hold_value is None or hold_value == WatermarkManager.WATERMARK_POS_INF): @@ -256,7 +256,7 @@ def extract_transform_timers(self): with self._lock: fired_timers = [] has_realtime_timer = False - for encoded_key, state in self._keyed_states.iteritems(): + for encoded_key, state in self._keyed_states.items(): timers, had_realtime_timer = state.get_timers( watermark=self._input_watermark, processing_time=self._clock.time()) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 19b3367ba0d9..09441860ecd2 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1767,7 +1767,7 @@ def __init__(self, serialized_values, coder): self._coder = coder self._serialized_values = [] self._total_size = 0 - self._serialized_values = serialized_values + self._serialized_values = list(serialized_values) self._total_size = sum(map(len, self._serialized_values)) def read(self, range_tracker): diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 927da14678c1..295268912fba 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -185,7 +185,7 @@ def wrapper(*args, **kwargs): sleep_interval = next(retry_intervals) except StopIteration: # Re-raise the original exception since we finished the retries. - raise exn, None, exn_traceback # pylint: disable=raising-bad-type + raise exn # pylint: disable=raising-bad-type logger( 'Retry with exponential backoff: waiting for %s seconds before ' From 6672477a734e8b9aa201238166de9c80a2cef0f0 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 4 Nov 2017 01:21:04 -0700 Subject: [PATCH 21/27] Fix the write_byte on slow stream to just use bytes rather than encode --- sdks/python/apache_beam/coders/slow_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index 8dc3f75ab684..f62ccddb6d67 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -43,7 +43,7 @@ def write(self, b, nested=False): self.data.append(b.encode("LATIN-1")) def write_byte(self, val): - self.data.append(chr(val).encode("LATIN-1")) + self.data.append(bytes(chr(val), "latin1")) def write_var_int64(self, v): if v < 0: From f1bd12101784e26feb9b666660cc1d47778e2220 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 4 Nov 2017 01:25:14 -0700 Subject: [PATCH 22/27] Fix some bytes coders tests to use bytes in py3 --- sdks/python/apache_beam/coders/coders.py | 2 +- .../apache_beam/coders/coders_test_common.py | 26 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 0d8c04b00e92..c96922f0c080 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -76,7 +76,7 @@ def serialize_coder(coder): def deserialize_coder(serialized): from apache_beam.internal import pickler - return pickler.loads(serialized.split('$', 1)[1]) + return pickler.loads(bytes(serialized.split('$', 1)[1], "latin-1")) # pylint: enable=wrong-import-order, wrong-import-position diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index bcb2253be46a..f36548c198f2 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -141,7 +141,7 @@ def test_fast_primitives_coder(self): self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,)) def test_bytes_coder(self): - self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000) + self.check_coder(coders.BytesCoder(), b'a', b'\0', b'z' * 1000) def test_varint_coder(self): # Small ints. @@ -208,14 +208,14 @@ def test_tuple_coder(self): kv_coder.as_cloud_object()) # Test binary representation self.assertEqual( - '\x04abc', + b'\x04abc', kv_coder.encode((4, 'abc'))) # Test unnested self.check_coder( kv_coder, - (1, 'a'), - (-2, 'a' * 100), - (300, 'abc\0' * 5)) + (1, b'a'), + (-2, b'a' * 100), + (300, b'abc\0' * 5)) # Test nested self.check_coder( coders.TupleCoder( @@ -292,7 +292,7 @@ def test_windowed_value_coder(self): }, coder.as_cloud_object()) # Test binary representation - self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01', + self.assertEqual(b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01', coder.encode(window.GlobalWindows.windowed_value(1))) # Test decoding large timestamp @@ -361,16 +361,16 @@ def test_length_prefix_coder(self): }, coder.as_cloud_object()) # Test binary representation - self.assertEqual('\x00', coder.encode('')) - self.assertEqual('\x01a', coder.encode('a')) - self.assertEqual('\x02bc', coder.encode('bc')) - self.assertEqual('\xff\x7f' + 'z' * 16383, coder.encode('z' * 16383)) + self.assertEqual(b'\x00', coder.encode('')) + self.assertEqual(b'\x01a', coder.encode('a')) + self.assertEqual(b'\x02bc', coder.encode('bc')) + self.assertEqual(b'\xff\x7f' + b'z' * 16383, coder.encode('z' * 16383)) # Test unnested - self.check_coder(coder, '', 'a', 'bc', 'def') + self.check_coder(coder, b'', b'a', b'bc', b'def') # Test nested self.check_coder(coders.TupleCoder((coder, coder)), - ('', 'a'), - ('bc', 'def')) + (b'', b'a'), + (b'bc', b'def')) def test_nested_observables(self): class FakeObservableIterator(observable.ObservableMixin): From e8b3b74456adb5dd2c0ce0f90b015b0d922555c7 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 4 Nov 2017 01:25:38 -0700 Subject: [PATCH 23/27] Change the fall through for encoding a bit and update past imports --- sdks/python/apache_beam/coders/coder_impl.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index b22bf141b976..50cc37ef9091 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -34,7 +34,7 @@ from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.utils.timestamp import Timestamp -from past.builtins import unicode +from past.builtins import basestring, unicode, long # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: @@ -184,7 +184,7 @@ def __init__(self, coder, step_label): self._step_label = step_label def _check_safe(self, value): - if isinstance(value, (str, unicode, long, int, float)): + if isinstance(value, (str, basestring, bytes, long, int, float)): pass elif value is None: pass @@ -272,10 +272,10 @@ def encode_to_stream(self, value, stream, nested): elif t is float: stream.write_byte(FLOAT_TYPE) stream.write_bigendian_double(value) - elif t is str: + elif t is bytes: stream.write_byte(STR_TYPE) stream.write(value, nested) - elif t is unicode: + elif t is str or t is basestring or t is unicode: unicode_value = value # for typing stream.write_byte(UNICODE_TYPE) stream.write(unicode_value.encode('utf-8'), nested) @@ -315,7 +315,7 @@ def decode_from_stream(self, stream, nested): vlen = stream.read_var_int64() vlist = [self.decode_from_stream(stream, True) for _ in range(vlen)] if t == LIST_TYPE: - return vlist + return list(vlist) elif t == TUPLE_TYPE: return tuple(vlist) return set(vlist) From 27e91ed193ad91ba3d30f13d80ac0e87d584ac0d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 4 Nov 2017 01:25:57 -0700 Subject: [PATCH 24/27] Only wrap in bytes for py3 --- sdks/python/apache_beam/coders/coders.py | 2 +- sdks/python/apache_beam/coders/slow_stream.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index c96922f0c080..0d8c04b00e92 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -76,7 +76,7 @@ def serialize_coder(coder): def deserialize_coder(serialized): from apache_beam.internal import pickler - return pickler.loads(bytes(serialized.split('$', 1)[1], "latin-1")) + return pickler.loads(serialized.split('$', 1)[1]) # pylint: enable=wrong-import-order, wrong-import-position diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index f62ccddb6d67..641cc0799032 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -43,7 +43,10 @@ def write(self, b, nested=False): self.data.append(b.encode("LATIN-1")) def write_byte(self, val): - self.data.append(bytes(chr(val), "latin1")) + if sys.version_info[0] == 3: + self.data.append(bytes(chr(val), "latin1")) + else: + self.data.append(chr(val)) def write_var_int64(self, v): if v < 0: From cf54dfde7cd0882422a92e36825c7fe5cbbaa500 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 4 Nov 2017 21:09:21 -0700 Subject: [PATCH 25/27] Fix remaining coder issues accross py2/3 --- sdks/python/apache_beam/coders/coder_impl.py | 16 ++++++++-------- sdks/python/apache_beam/coders/coders.py | 16 +++++++++------- .../apache_beam/coders/coders_test_common.py | 13 ++++++++++--- sdks/python/apache_beam/coders/slow_stream.py | 7 +++++-- 4 files changed, 32 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 50cc37ef9091..e9bb689934bf 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -51,7 +51,7 @@ from .slow_stream import ByteCountingOutputStream from .slow_stream import get_varint_size # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports - +from past.utils import old_div class CoderImpl(object): """For internal use only; no backwards-compatibility guarantees.""" @@ -289,7 +289,7 @@ def encode_to_stream(self, value, stream, nested): dict_value = value # for typing stream.write_byte(DICT_TYPE) stream.write_var_int64(len(dict_value)) - for k, v in dict_value.iteritems(): + for k, v in dict_value.items(): self.encode_to_stream(k, stream, True) self.encode_to_stream(v, stream, True) elif t is bool: @@ -381,8 +381,8 @@ def _from_normal_time(self, value): def encode_to_stream(self, value, out, nested): span_micros = value.end.micros - value.start.micros - out.write_bigendian_uint64(self._from_normal_time(value.end.micros / 1000)) - out.write_var_int64(span_micros / 1000) + out.write_bigendian_uint64(self._from_normal_time(old_div(value.end.micros, 1000))) + out.write_var_int64(old_div(span_micros, 1000)) def decode_from_stream(self, in_, nested): end_millis = self._to_normal_time(in_.read_bigendian_uint64()) @@ -396,7 +396,7 @@ def estimate_size(self, value, nested=False): # An IntervalWindow is context-insensitive, with a timestamp (8 bytes) # and a varint timespam. span = value.end.micros - value.start.micros - return 8 + get_varint_size(span / 1000) + return 8 + get_varint_size(old_div(span, 1000)) class TimestampCoderImpl(StreamCoderImpl): @@ -693,7 +693,7 @@ def encode_to_stream(self, value, out, nested): # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on # precision of timestamps. self._from_normal_time( - restore_sign * (abs(wv.timestamp_micros) / 1000))) + restore_sign * old_div(abs(wv.timestamp_micros), 1000))) self._windows_coder.encode_to_stream(wv.windows, out, True) # Default PaneInfo encoded byte representing NO_FIRING. # TODO(BEAM-1522): Remove the hard coding here once PaneInfo is supported. @@ -708,9 +708,9 @@ def decode_from_stream(self, in_stream, nested): # were indeed MIN/MAX timestamps. # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on # precision of timestamps. - if timestamp == -(abs(MIN_TIMESTAMP.micros) / 1000): + if timestamp == -(old_div(abs(MIN_TIMESTAMP.micros), 1000)): timestamp = MIN_TIMESTAMP.micros - elif timestamp == (MAX_TIMESTAMP.micros / 1000): + elif timestamp == (old_div(MAX_TIMESTAMP.micros, 1000)): timestamp = MAX_TIMESTAMP.micros else: timestamp *= 1000 diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 0d8c04b00e92..acb4f726fdbf 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -67,16 +67,15 @@ def serialize_coder(coder): from apache_beam.internal import pickler - result = '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder)) - if sys.version_info[0] == 3: - return result.encode("LATIN-1") - else: - return result + # TODO: Do we need this class name for anything or could we just simplify this? + result = '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder).decode()) + return result def deserialize_coder(serialized): from apache_beam.internal import pickler - return pickler.loads(serialized.split('$', 1)[1]) + split = str(serialized).split('$', 1) + return pickler.loads(split[1]) # pylint: enable=wrong-import-order, wrong-import-position @@ -225,6 +224,9 @@ def __eq__(self, other): and self._dict_without_impl() == other._dict_without_impl()) # pylint: enable=protected-access + def __hash__(self): + return hash(type(self)) + _known_urns = {} @classmethod @@ -275,7 +277,7 @@ def from_runner_api(cls, coder_proto, context): def to_runner_api_parameter(self, context): return ( python_urns.PICKLED_CODER, - wrappers_pb2.BytesValue(value=serialize_coder(self)), + wrappers_pb2.BytesValue(value=serialize_coder(self).encode()), ()) @staticmethod diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index f36548c198f2..beceabe20965 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -23,6 +23,7 @@ import unittest import dill +import sys from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message from apache_beam.coders import coders @@ -83,6 +84,12 @@ def _observe_nested(cls, coder): cls.seen_nested.add(type(c)) cls._observe_nested(c) + def assertItemsEqual(self, a, b): + if sys.version_info[0] >= 3: + self.assertCountEqual(a, b) + else: + super(CodersTest, self).assertItemsEqual(a, b) + def check_coder(self, coder, *values): self._observe(coder) for v in values: @@ -105,7 +112,7 @@ def test_custom_coder(self): self.check_coder(CustomCoder(), 1, -10, 5) self.check_coder(coders.TupleCoder((CustomCoder(), coders.BytesCoder())), - (1, 'a'), (-10, 'b'), (5, 'c')) + (1, b'a'), (-10, b'b'), (5, b'c')) def test_pickle_coder(self): self.check_coder(coders.PickleCoder(), 'a', 1, 1.5, (1, 2, 3)) @@ -192,7 +199,7 @@ def test_timestamp_coder(self): timestamp.Timestamp(micros=1234567890123456789)) self.check_coder( coders.TupleCoder((coders.TimestampCoder(), coders.BytesCoder())), - (timestamp.Timestamp.of(27), 'abc')) + (timestamp.Timestamp.of(27), b'abc')) def test_tuple_coder(self): kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())) @@ -334,7 +341,7 @@ def test_proto_coder(self): proto_coder = coders.ProtoCoder(ma.__class__) self.check_coder(proto_coder, ma) self.check_coder(coders.TupleCoder((proto_coder, coders.BytesCoder())), - (ma, 'a'), (mb, 'b')) + (ma, b'a'), (mb, b'b')) def test_global_window_coder(self): coder = coders.GlobalWindowCoder() diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index 641cc0799032..2aefbc6532c6 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -116,7 +116,10 @@ class InputStream(object): A pure Python implementation of stream.InputStream.""" def __init__(self, data): - self.data = data + if sys.version_info[0] == 3 and isinstance(data, str): + self.data = bytes(data, "latin-1") + else: + self.data = data self.pos = 0 def size(self): @@ -140,7 +143,7 @@ def read_var_int64(self): shift = 0 result = 0 while True: - byte = self.read_byte() + byte = int(self.read_byte()) if byte < 0: raise RuntimeError('VarLong not terminated.') From 1e4fbd4866aa50c725cf26d057ab3fd9ab5daea5 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 14 Feb 2018 05:24:05 -0800 Subject: [PATCH 26/27] Style fixes, remove uneeded imports after other unrelated things fixed upstream. --- sdks/python/apache_beam/coders/coder_impl.py | 20 ++++++++++--------- sdks/python/apache_beam/coders/coders.py | 15 +++++++------- .../apache_beam/coders/coders_test_common.py | 10 +++++----- sdks/python/apache_beam/coders/slow_stream.py | 6 ++++-- sdks/python/apache_beam/coders/typecoders.py | 4 ++-- .../apache_beam/examples/snippets/snippets.py | 4 ++-- .../examples/streaming_wordcount.py | 3 ++- .../examples/windowed_wordcount.py | 4 ++-- sdks/python/apache_beam/examples/wordcount.py | 4 ++-- .../examples/wordcount_debugging.py | 4 ++-- .../apache_beam/examples/wordcount_fnapi.py | 3 ++- .../apache_beam/examples/wordcount_minimal.py | 4 ++-- sdks/python/apache_beam/internal/gcp/auth.py | 10 ++++++++-- sdks/python/apache_beam/io/filesystem.py | 10 +++++----- .../apache_beam/io/gcp/datastore/v1/helper.py | 4 ++-- sdks/python/apache_beam/io/gcp/gcsio.py | 1 - sdks/python/apache_beam/io/gcp/pubsub.py | 5 ++--- sdks/python/apache_beam/io/gcp/pubsub_test.py | 3 +-- sdks/python/apache_beam/io/range_trackers.py | 2 -- sdks/python/apache_beam/pipeline.py | 3 ++- sdks/python/apache_beam/runners/common.py | 6 ++---- .../runners/dataflow/dataflow_runner.py | 9 +++++++-- .../runners/dataflow/ptransform_overrides.py | 2 +- .../runners/direct/direct_runner.py | 2 -- .../apache_beam/runners/pipeline_context.py | 5 ++--- .../apache_beam/runners/worker/data_plane.py | 2 +- .../python/apache_beam/testing/test_stream.py | 3 ++- .../apache_beam/transforms/combiners.py | 4 ++-- sdks/python/apache_beam/transforms/core.py | 7 ++++--- sdks/python/apache_beam/transforms/display.py | 1 - .../apache_beam/transforms/display_test.py | 3 +-- .../python/apache_beam/transforms/timeutil.py | 1 + sdks/python/apache_beam/transforms/trigger.py | 3 ++- sdks/python/apache_beam/transforms/util.py | 2 +- sdks/python/apache_beam/transforms/window.py | 4 ++-- sdks/python/apache_beam/typehints/opcodes.py | 2 -- .../typehints/trivial_inference.py | 5 ----- .../python/apache_beam/typehints/typehints.py | 4 +++- sdks/python/run_pylint.sh | 3 +++ 39 files changed, 98 insertions(+), 89 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index e9bb689934bf..cf1a84aeee26 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -28,14 +28,16 @@ """ from __future__ import absolute_import +from past.builtins import basestring +from past.builtins import long +from past.builtins import unicode + from apache_beam.coders import observable from apache_beam.utils import windowed_value from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.utils.timestamp import Timestamp -from past.builtins import basestring, unicode, long - # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: from .stream import InputStream as create_InputStream @@ -51,7 +53,7 @@ from .slow_stream import ByteCountingOutputStream from .slow_stream import get_varint_size # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports -from past.utils import old_div + class CoderImpl(object): """For internal use only; no backwards-compatibility guarantees.""" @@ -381,8 +383,8 @@ def _from_normal_time(self, value): def encode_to_stream(self, value, out, nested): span_micros = value.end.micros - value.start.micros - out.write_bigendian_uint64(self._from_normal_time(old_div(value.end.micros, 1000))) - out.write_var_int64(old_div(span_micros, 1000)) + out.write_bigendian_uint64(self._from_normal_time(value.end.micros // 1000)) + out.write_var_int64(span_micros // 1000) def decode_from_stream(self, in_, nested): end_millis = self._to_normal_time(in_.read_bigendian_uint64()) @@ -396,7 +398,7 @@ def estimate_size(self, value, nested=False): # An IntervalWindow is context-insensitive, with a timestamp (8 bytes) # and a varint timespam. span = value.end.micros - value.start.micros - return 8 + get_varint_size(old_div(span, 1000)) + return 8 + get_varint_size(span // 1000) class TimestampCoderImpl(StreamCoderImpl): @@ -693,7 +695,7 @@ def encode_to_stream(self, value, out, nested): # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on # precision of timestamps. self._from_normal_time( - restore_sign * old_div(abs(wv.timestamp_micros), 1000))) + restore_sign * (abs(wv.timestamp_micros) // 1000))) self._windows_coder.encode_to_stream(wv.windows, out, True) # Default PaneInfo encoded byte representing NO_FIRING. # TODO(BEAM-1522): Remove the hard coding here once PaneInfo is supported. @@ -708,9 +710,9 @@ def decode_from_stream(self, in_stream, nested): # were indeed MIN/MAX timestamps. # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on # precision of timestamps. - if timestamp == -(old_div(abs(MIN_TIMESTAMP.micros), 1000)): + if timestamp == -(abs(MIN_TIMESTAMP.micros) // 1000): timestamp = MIN_TIMESTAMP.micros - elif timestamp == (old_div(MAX_TIMESTAMP.micros, 1000)): + elif timestamp == (MAX_TIMESTAMP.micros // 1000): timestamp = MAX_TIMESTAMP.micros else: timestamp *= 1000 diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index acb4f726fdbf..8b215d1fe277 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -23,11 +23,6 @@ import base64 import sys -if sys.version_info[0] == 2: - import cPickle as pickle -else: - import pickle as pickle - from past.builtins import unicode import google.protobuf from google.protobuf import wrappers_pb2 @@ -38,6 +33,12 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.utils import proto_utils +if sys.version_info[0] == 2: + import cPickle as pickle +else: + import pickle as pickle + from past.builtins import unicode + # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: from .stream import get_varint_size @@ -46,7 +47,7 @@ # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports -# pylint: disable=wrong-import-order, wrong-import-position +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports # Avoid dependencies on the full SDK. try: # Import dill from the pickler module to make sure our monkey-patching of dill @@ -67,7 +68,7 @@ def serialize_coder(coder): from apache_beam.internal import pickler - # TODO: Do we need this class name for anything or could we just simplify this? + # TODO: Do we need this class name for anything or could we just simplify? result = '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder).decode()) return result diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index beceabe20965..7de83474558c 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -20,10 +20,10 @@ import logging import math +import sys import unittest import dill -import sys from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message from apache_beam.coders import coders @@ -85,10 +85,10 @@ def _observe_nested(cls, coder): cls._observe_nested(c) def assertItemsEqual(self, a, b): - if sys.version_info[0] >= 3: - self.assertCountEqual(a, b) - else: - super(CodersTest, self).assertItemsEqual(a, b) + if sys.version_info[0] >= 3: + self.assertCountEqual(a, b) + else: + super(CodersTest, self).assertItemsEqual(a, b) def check_coder(self, coder, *values): self._observe(coder) diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index 2aefbc6532c6..db910346f2db 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -22,8 +22,10 @@ import struct import sys + from past.builtins import basestring + class OutputStream(object): """For internal use only; no backwards-compatibility guarantees. @@ -33,8 +35,8 @@ def __init__(self): self.data = [] def write(self, b, nested=False): - assert (isinstance(b, basestring) or isinstance(b, bytes), - "%r is not a basestring or bytes it is a %r" % (b, type(b))) + assert isinstance(b, basestring, bytes), \ + "%r is not a basestring or bytes it is a %r" % (b, type(b)) if nested: self.write_var_int64(len(b)) if isinstance(b, bytes): diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 72fc64f31e24..799c9f87876b 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -64,11 +64,11 @@ def MakeXyzs(v): See apache_beam.typehints.decorators module for more details. """ +from past.builtins import unicode + from apache_beam.coders import coders from apache_beam.typehints import typehints -from past.builtins import unicode - __all__ = ['registry'] diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 5bc1188282cc..821a962b63eb 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -30,6 +30,8 @@ string. The tags can contain only letters, digits and _. """ +from past.builtins import unicode + import apache_beam as beam from apache_beam.io import iobase from apache_beam.io.range_trackers import OffsetRangeTracker @@ -40,8 +42,6 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms.core import PTransform -from past.builtins import unicode - # Quiet some pylint warnings that happen because of the somewhat special # format for the code snippets. # pylint:disable=invalid-name diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index d4ec961c2e6c..3f72fb57b836 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -26,12 +26,13 @@ import argparse import logging +from past.builtins import unicode + import apache_beam as beam import apache_beam.transforms.window as window from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions -from past.builtins import unicode def split_fn(lines): import re diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py index ed7d20381454..457a239b9019 100644 --- a/sdks/python/apache_beam/examples/windowed_wordcount.py +++ b/sdks/python/apache_beam/examples/windowed_wordcount.py @@ -26,11 +26,11 @@ import argparse import logging +from past.builtins import unicode + import apache_beam as beam import apache_beam.transforms.window as window -from past.builtins import unicode - TABLE_SCHEMA = ('word:STRING, count:INTEGER, ' 'window_start:TIMESTAMP, window_end:TIMESTAMP') diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 83cfa3a40c10..4fdcfeb1741c 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -23,6 +23,8 @@ import logging import re +from past.builtins import unicode + import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText @@ -31,8 +33,6 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions -from past.builtins import unicode - class WordExtractingDoFn(beam.DoFn): """Parse each line of input text into words.""" diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index 929b99ce707c..23bb52b15691 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -45,6 +45,8 @@ import logging import re +from past.builtins import unicode + import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText @@ -54,8 +56,6 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from past.builtins import unicode - class FilterTextFn(beam.DoFn): """A DoFn that filters for a specific key based on a regular expression.""" diff --git a/sdks/python/apache_beam/examples/wordcount_fnapi.py b/sdks/python/apache_beam/examples/wordcount_fnapi.py index 690ad65509be..bf4998af15e3 100644 --- a/sdks/python/apache_beam/examples/wordcount_fnapi.py +++ b/sdks/python/apache_beam/examples/wordcount_fnapi.py @@ -28,6 +28,8 @@ import logging import re +from past.builtins import unicode + import apache_beam as beam from apache_beam.io import ReadFromText # TODO(BEAM-2887): Enable after the issue is fixed. @@ -38,7 +40,6 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions -from past.builtins import unicode class WordExtractingDoFn(beam.DoFn): """Parse each line of input text into words.""" diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index cc4da53fa499..ef66a38ed550 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -50,14 +50,14 @@ import logging import re +from past.builtins import unicode + import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions -from past.builtins import unicode - def run(argv=None): """Main entry point; defines and runs the wordcount pipeline.""" diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 3159d40e8681..a425e23f8a03 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -17,18 +17,23 @@ """Dataflow credentials and authentication.""" +# See https://github.com/PyCQA/pylint/issues/1160 :( +# pylint: disable=wrong-import-position,wrong-import-order from future import standard_library standard_library.install_aliases() import datetime import json import logging import os -import urllib.request, urllib.error, urllib.parse +import urllib.request +import urllib.error +import urllib.parse from oauth2client.client import GoogleCredentials from oauth2client.client import OAuth2Credentials from apache_beam.utils import retry +# pylint: enable=wrong-import-position,wrong-import-order # When we are running in GCE, we can authenticate with VM credentials. is_running_in_gce = False @@ -91,7 +96,8 @@ def _refresh(self, http_request): 'GCE_METADATA_ROOT', 'metadata.google.internal') token_url = ('http://{}/computeMetadata/v1/instance/service-accounts/' 'default/token').format(metadata_root) - req = urllib.request.Request(token_url, headers={'Metadata-Flavor': 'Google'}) + req = urllib.request.Request(token_url, + headers={'Metadata-Flavor': 'Google'}) token_data = json.loads(urllib.request.urlopen(req).read()) self.access_token = token_data['access_token'] self.token_expiry = (refresh_time + diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 252cdefe241f..3a1c0a70756d 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -20,16 +20,16 @@ import abc import bz2 -import logging import io +import logging import os import time import zlib +from future.utils import with_metaclass from six import integer_types from apache_beam.utils.plugin import BeamPlugin -from future.utils import with_metaclass logger = logging.getLogger(__name__) @@ -238,7 +238,7 @@ def readline(self): if not self._decompressor: raise ValueError('decompressor not initialized') - io = io.StringIO() + sio = io.StringIO() while True: # Ensure that the internal buffer has at least half the read_size. Going # with half the _read_size (as opposed to a full _read_size) to ensure @@ -247,11 +247,11 @@ def readline(self): self._fetch_to_internal_buffer(self._read_size / 2) line = self._read_from_internal_buffer( lambda: self._read_buffer.readline()) - io.write(line) + sio.write(line) if line.endswith('\n') or not line: break # Newline or EOF reached. - return io.getvalue() + return sio.getvalue() def closed(self): return not self._file or self._file.closed() diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index 2399cec48c90..c6d58fa86fd9 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -26,6 +26,8 @@ import time from socket import error as SocketError +from past.builtins import unicode + # pylint: disable=ungrouped-imports from apache_beam.internal.gcp import auth from apache_beam.utils import retry @@ -47,8 +49,6 @@ # pylint: enable=ungrouped-imports -from past.builtins import unicode - def key_comparator(k1, k2): """A comparator for Datastore keys. diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 7f4c5ab41b49..3bdf2e64ca2e 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -27,7 +27,6 @@ import logging import multiprocessing import os -import queue import re import threading import time diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index f909386d0067..20ea697c52d7 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -26,6 +26,8 @@ import re +from past.builtins import unicode + from apache_beam import coders from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write @@ -36,9 +38,6 @@ from apache_beam.transforms import window from apache_beam.transforms.display import DisplayDataItem - -from past.builtins import unicode - __all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub'] diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 05a98f87ba91..6eda15e14837 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -21,6 +21,7 @@ import unittest import hamcrest as hc +from past.builtins import unicode import apache_beam as beam from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub @@ -41,8 +42,6 @@ pubsub = None # pylint: enable=wrong-import-order, wrong-import-position -from past.builtins import unicode - @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') class TestReadStringsFromPubSubOverride(unittest.TestCase): diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py index ee5224d62b0b..2da8736b1141 100644 --- a/sdks/python/apache_beam/io/range_trackers.py +++ b/sdks/python/apache_beam/io/range_trackers.py @@ -26,8 +26,6 @@ from apache_beam.io import iobase -from past.builtins import long - __all__ = ['OffsetRangeTracker', 'LexicographicKeyRangeTracker', 'OrderedPositionRangeTracker', 'UnsplittableRangeTracker'] diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 98d2b14f137a..9d7e207a7401 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -53,6 +53,8 @@ import shutil import tempfile +from future.utils import with_metaclass + from apache_beam import pvalue from apache_beam.internal import pickler from apache_beam.io.filesystems import FileSystems @@ -70,7 +72,6 @@ from apache_beam.typehints import TypeCheckError from apache_beam.typehints import typehints from apache_beam.utils.annotations import deprecated -from future.utils import with_metaclass __all__ = ['Pipeline', 'PTransformOverride'] diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index f236453b9285..f6a69311c9b7 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -499,7 +499,6 @@ def _reraise_augmented(self, exn): raise step_annotation = " [while running '%s']" % self.step_name # To emulate exception chaining (not available in Python 2). - original_traceback = sys.exc_info()[2] try: # Attempt to construct the same kind of exception # with an augmented message. @@ -513,7 +512,6 @@ def _reraise_augmented(self, exn): raise Exception(step_annotation) else: # To emulate exception chaining (not available in Python 2). - original_traceback = sys.exc_info()[2] try: # Attempt to construct the same kind of exception # with an augmented message. @@ -523,8 +521,8 @@ def _reraise_augmented(self, exn): # If anything goes wrong, construct a RuntimeError whose message # records the original exception's type and message. new_exn = RuntimeError( - traceback.format_exception_only(type(exn), exn)[-1].strip() - + step_annotation) + traceback.format_exception_only(type(exn), exn)[-1].strip() + + step_annotation) new_exn._tagged_with_step = True new_exn.args = exn.args raise new_exn diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index b59b349ab602..f119c87e6cdb 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -22,13 +22,17 @@ """ from __future__ import absolute_import +# See https://github.com/PyCQA/pylint/issues/1160 :( +# pylint: disable=wrong-import-position,wrong-import-order from future import standard_library standard_library.install_aliases() import logging import threading import time import traceback -import urllib.request, urllib.parse, urllib.error +import urllib.request +import urllib.parse +import urllib.error from collections import defaultdict import apache_beam as beam @@ -54,6 +58,7 @@ from apache_beam.transforms.display import DisplayData from apache_beam.typehints import typehints from apache_beam.utils.plugin import BeamPlugin +# pylint: enable=wrong-import-position,wrong-import-order __all__ = ['DataflowRunner'] @@ -281,7 +286,7 @@ def run_pipeline(self, pipeline): # Performing configured PTransform overrides. # Imported here to avoid circular dependencies. - # TODO: Remove the apache_beam.pipeline dependency in CreatePTransformOverride + # TODO: Remove the apache_beam.pipeline dependency CreatePTransformOverride from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride pipeline.replace_all(CreatePTransformOverride()) diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py index 0e0e0c91a873..bf7835452af0 100644 --- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py +++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py @@ -18,10 +18,10 @@ """Ptransform overrides for DataflowRunner.""" from __future__ import absolute_import - from apache_beam.coders import typecoders from apache_beam.pipeline import * + class CreatePTransformOverride(PTransformOverride): """A ``PTransformOverride`` for ``Create`` in streaming mode.""" diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index fb05776cd349..cb4eb43d7320 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -51,8 +51,6 @@ from apache_beam.transforms.core import _GroupByKeyOnly from apache_beam.transforms.ptransform import PTransform -from past.builtins import unicode - # Note that the BundleBasedDirectRunner and SwitchingDirectRunner names are # experimental and have no backwards compatibility guarantees. __all__ = ['BundleBasedDirectRunner', diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index 87e24c7af69a..612afc4a5654 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -20,9 +20,8 @@ For internal use only; no backwards-compatibility guarantees. """ - -from builtins import object import sys +from builtins import object from apache_beam import coders from apache_beam import pipeline @@ -74,7 +73,7 @@ def get_by_id(self, id): try: if myid not in self._id_to_obj: self._id_to_obj[myid] = self._obj_type.from_runner_api( - self._id_to_proto[myid], self._pipeline_context) + self._id_to_proto[myid], self._pipeline_context) return self._id_to_obj[myid] except: raise Exception("Error occured fetching id " + diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index d84aa32a0060..1fc0a7887e75 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -29,11 +29,11 @@ import threading import grpc +from future.utils import with_metaclass from apache_beam.coders import coder_impl from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc -from future.utils import with_metaclass # This module is experimental. No backwards-compatibility guarantees. diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py index 0598c1b7fac2..4a4895fc3bef 100644 --- a/sdks/python/apache_beam/testing/test_stream.py +++ b/sdks/python/apache_beam/testing/test_stream.py @@ -23,6 +23,8 @@ from abc import ABCMeta from abc import abstractmethod +from future.utils import with_metaclass + from apache_beam import coders from apache_beam import core from apache_beam import pvalue @@ -31,7 +33,6 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import timestamp from apache_beam.utils.windowed_value import WindowedValue -from future.utils import with_metaclass __all__ = [ 'Event', diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 5696859695b2..9b0c0e81e35e 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -22,6 +22,8 @@ import operator import random +from past.builtins import long + from apache_beam.transforms import core from apache_beam.transforms import cy_combiners from apache_beam.transforms import ptransform @@ -36,8 +38,6 @@ from apache_beam.typehints import with_input_types from apache_beam.typehints import with_output_types -from past.builtins import long - __all__ = [ 'Count', 'Mean', diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 09441860ecd2..1d1349a2906e 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -23,6 +23,8 @@ import inspect import types +from past.builtins import basestring + from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints @@ -55,8 +57,6 @@ from apache_beam.typehints.typehints import is_consistent_with from apache_beam.utils import urns -from past.builtins import basestring - __all__ = [ 'DoFn', 'CombineFn', @@ -1434,6 +1434,7 @@ def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PCollection(pcoll.pipeline) + class _GroupAlsoByWindowDoFn(DoFn): # TODO(robertwb): Support combiner lifting. @@ -1547,7 +1548,7 @@ def __eq__(self, other): def __hash__(self): return hash((self.windowfn, self.triggerfn, self.accumulation_mode, - self.timestamp_combiner)) + self.timestamp_combiner)) def is_default(self): return self._is_default diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index a93bda51de05..afb6ed4b4c7a 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -44,7 +44,6 @@ from datetime import datetime from datetime import timedelta - from past.builtins import unicode __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 5676cf8bbe8b..4152b9434031 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -24,6 +24,7 @@ import hamcrest as hc from hamcrest.core.base_matcher import BaseMatcher +from past.builtins import unicode import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions @@ -31,8 +32,6 @@ from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.display import HasDisplayData -from past.builtins import unicode - class DisplayDataItemMatcher(BaseMatcher): """ Matcher class for DisplayDataItems in unit tests. diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py index 87efb954c829..e8cf8dcb60b5 100644 --- a/sdks/python/apache_beam/transforms/timeutil.py +++ b/sdks/python/apache_beam/transforms/timeutil.py @@ -21,6 +21,7 @@ from abc import ABCMeta from abc import abstractmethod + from future.utils import with_metaclass __all__ = [ diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 7d50fcd83a03..a20c7a72523e 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -28,6 +28,8 @@ from abc import ABCMeta from abc import abstractmethod +from future.utils import with_metaclass + from apache_beam.coders import observable from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.transforms import combiners @@ -40,7 +42,6 @@ from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.utils.timestamp import TIME_GRANULARITY -from future.utils import with_metaclass # AfterCount is experimental. No backwards compatibility guarantees. diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 14a5c89175f1..0b5020a4d830 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -260,7 +260,7 @@ def _thin_data(self): odd_one_out = [sorted_data[-1]] if len(sorted_data) % 2 == 1 else [] # Sort the pairs by how different they are. pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]), - key=lambda x1_y1_x2_y2: x1_y1_x2_y2[1][0] / x1_y1_x2_y2[0][0]) + key=lambda x_y_x2_y2: x_y_x2_y2[1][0] / x_y_x2_y2[0][0]) # Keep the top 1/3 most different pairs, average the top 2/3 most similar. threshold = 2 * len(pairs) / 3 self._data = ( diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index de60391c6df9..7d8fd4a498a7 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -51,6 +51,7 @@ import abc +from future.utils import with_metaclass from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 @@ -67,7 +68,6 @@ from apache_beam.utils.timestamp import Duration from apache_beam.utils.timestamp import Timestamp from apache_beam.utils.windowed_value import WindowedValue -from future.utils import with_metaclass __all__ = [ 'TimestampCombiner', @@ -218,7 +218,7 @@ def __ge__(self, other): return self.end >= other.end def __ne__(self, other): - return not (self.__eq__(other)) + return not self.__eq__(other) def __eq__(self, other): raise NotImplementedError diff --git a/sdks/python/apache_beam/typehints/opcodes.py b/sdks/python/apache_beam/typehints/opcodes.py index 4ea9f3177d04..7fae11b63eab 100644 --- a/sdks/python/apache_beam/typehints/opcodes.py +++ b/sdks/python/apache_beam/typehints/opcodes.py @@ -47,8 +47,6 @@ from .typehints import Tuple from .typehints import Union -from past.builtins import unicode - def pop_one(state, unused_arg): del state.stack[-1:] diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py index 3611de6be7e3..28bf8f5ba6f3 100644 --- a/sdks/python/apache_beam/typehints/trivial_inference.py +++ b/sdks/python/apache_beam/typehints/trivial_inference.py @@ -41,11 +41,6 @@ class TypeInferenceError(ValueError): pass -try: - from types import InstanceType -except ImportError: - InstanceType = object - def instance_to_type(o): """Given a Python object o, return the corresponding type hint. """ diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index b90deabf79e7..21d31b044bd4 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -67,6 +67,7 @@ import copy import sys import types + from future.utils import with_metaclass __all__ = [ @@ -991,7 +992,8 @@ def __getitem__(self, type_param): IteratorTypeConstraint = IteratorHint.IteratorTypeConstraint -class WindowedTypeConstraint(with_metaclass(GetitemConstructor, TypeConstraint)): +class WindowedTypeConstraint( + with_metaclass(GetitemConstructor, TypeConstraint)): """A type constraint for WindowedValue objects. Mostly for internal use. diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index d2d424e28d9e..cae8aeed906b 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -72,6 +72,9 @@ ISORT_EXCLUDED=( "iobase_test.py" "fast_coders_test.py" "slow_coders_test.py" + "dataflow_runner.py" + "auth.py" + "hadoopfilesystem.py" ) SKIP_PARAM="" for file in "${ISORT_EXCLUDED[@]}"; do From 1bb0328d314d7ccd59680fa75f901e2aff4195aa Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 15 Feb 2018 05:07:20 -0800 Subject: [PATCH 27/27] Fix instance check --- sdks/python/apache_beam/coders/slow_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index db910346f2db..8af765a5fb11 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -35,7 +35,7 @@ def __init__(self): self.data = [] def write(self, b, nested=False): - assert isinstance(b, basestring, bytes), \ + assert isinstance(b, (basestring, bytes)), \ "%r is not a basestring or bytes it is a %r" % (b, type(b)) if nested: self.write_var_int64(len(b))