diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 8af394b6686c..91768432d915 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -68,7 +68,6 @@ cdef class DeterministicFastPrimitivesCoderImpl(CoderImpl): cdef bint _check_safe(self, value) except -1 -cdef object NoneType cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE, BOOL_TYPE cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE, SET_TYPE diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index b5b17899601f..4bfb19a52a1e 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -28,7 +28,7 @@ """ from __future__ import absolute_import -from types import NoneType +import six from apache_beam.coders import observable from apache_beam.utils import windowed_value @@ -197,7 +197,7 @@ def __init__(self, coder, step_label): self._step_label = step_label def _check_safe(self, value): - if isinstance(value, (str, unicode, long, int, float)): + if isinstance(value, (str, six.text_type, long, int, float)): pass elif value is None: pass @@ -277,7 +277,7 @@ def get_estimated_size_and_observables(self, value, nested=False): def encode_to_stream(self, value, stream, nested): t = type(value) - if t is NoneType: + if value is None: stream.write_byte(NONE_TYPE) elif t is int: stream.write_byte(INT_TYPE) @@ -288,7 +288,7 @@ def encode_to_stream(self, value, stream, nested): elif t is str: stream.write_byte(STR_TYPE) stream.write(value, nested) - elif t is unicode: + elif t is six.text_type: unicode_value = value # for typing stream.write_byte(UNICODE_TYPE) stream.write(unicode_value.encode('utf-8'), nested) @@ -302,7 +302,7 @@ def encode_to_stream(self, value, stream, nested): dict_value = value # for typing stream.write_byte(DICT_TYPE) stream.write_var_int64(len(dict_value)) - for k, v in dict_value.iteritems(): + for k, v in six.iteritems(dict_value): self.encode_to_stream(k, stream, True) self.encode_to_stream(v, stream, True) elif t is bool: diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index f76625869879..f3c99f730f31 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -22,17 +22,22 @@ from __future__ import absolute_import import base64 -import cPickle as pickle +# pylint: disable=ungrouped-imports import google.protobuf +import six from google.protobuf import wrappers_pb2 +import six.moves.cPickle as pickle from apache_beam.coders import coder_impl from apache_beam.portability import common_urns from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.utils import proto_utils +# pylint: enable=ungrouped-imports + + # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: from .stream import get_varint_size @@ -309,7 +314,7 @@ class ToStringCoder(Coder): """A default string coder used if no sink coder is specified.""" def encode(self, value): - if isinstance(value, unicode): + if isinstance(value, six.text_type): return value.encode('utf-8') elif isinstance(value, str): return value diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py b/sdks/python/apache_beam/coders/fast_coders_test.py index a13334a2c26f..32089ced0e0a 100644 --- a/sdks/python/apache_beam/coders/fast_coders_test.py +++ b/sdks/python/apache_beam/coders/fast_coders_test.py @@ -20,7 +20,6 @@ import logging import unittest - # Run all the standard coder test cases. from apache_beam.coders.coders_test_common import * diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py b/sdks/python/apache_beam/coders/slow_coders_test.py index 97aa39ca094f..ff4693e16d7a 100644 --- a/sdks/python/apache_beam/coders/slow_coders_test.py +++ b/sdks/python/apache_beam/coders/slow_coders_test.py @@ -20,7 +20,6 @@ import logging import unittest - # Run all the standard coder test cases. from apache_beam.coders.coders_test_common import * diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py index 15bc5eb9ba93..e509eadbd3f4 100644 --- a/sdks/python/apache_beam/coders/stream_test.py +++ b/sdks/python/apache_beam/coders/stream_test.py @@ -17,6 +17,8 @@ """Tests for the stream implementations.""" +from __future__ import absolute_import + import logging import math import unittest diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index dd071d7a9331..96b71744b9a7 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -64,6 +64,10 @@ def MakeXyzs(v): See apache_beam.typehints.decorators module for more details. """ +from __future__ import absolute_import + +import six + from apache_beam.coders import coders from apache_beam.typehints import typehints @@ -84,7 +88,7 @@ def register_standard_coders(self, fallback_coder): self._register_coder_internal(float, coders.FloatCoder) self._register_coder_internal(str, coders.BytesCoder) self._register_coder_internal(bytes, coders.BytesCoder) - self._register_coder_internal(unicode, coders.StrUtf8Coder) + self._register_coder_internal(six.text_type, coders.StrUtf8Coder) self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder) # Default fallback coders applied in that order until the first matching # coder found. diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 30fc8903283c..71cf5b721475 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -42,7 +42,6 @@ Avro file. """ -import cStringIO import os import zlib from functools import partial @@ -60,6 +59,11 @@ from apache_beam.io.iobase import Read from apache_beam.transforms import PTransform +try: + from cStringIO import StringIO as BytesIO +except ImportError: + from io import BytesIO + __all__ = ['ReadFromAvro', 'ReadAllFromAvro', 'WriteToAvro'] @@ -311,7 +315,7 @@ def _decompress_bytes(data, codec): # We take care to avoid extra copies of data while slicing large objects # by use of a buffer. result = snappy.decompress(buffer(data)[:-4]) - avroio.BinaryDecoder(cStringIO.StringIO(data[-4:])).check_crc32(result) + avroio.BinaryDecoder(BytesIO(data[-4:])).check_crc32(result) return result else: raise ValueError('Unknown codec: %r', codec) @@ -321,7 +325,7 @@ def num_records(self): def records(self): decoder = avroio.BinaryDecoder( - cStringIO.StringIO(self._decompressed_block_bytes)) + BytesIO(self._decompressed_block_bytes)) reader = avroio.DatumReader( writers_schema=self._schema, readers_schema=self._schema) diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py index 0f7dd547e76e..c99add1dd7be 100644 --- a/sdks/python/apache_beam/io/concat_source_test.py +++ b/sdks/python/apache_beam/io/concat_source_test.py @@ -91,10 +91,10 @@ def test_conact_source(self): RangeSource(12, 16), ]) self.assertEqual(list(source.read(source.get_range_tracker())), - range(16)) + list(range(16))) self.assertEqual(list(source.read(source.get_range_tracker((1, None), (2, 10)))), - range(4, 10)) + list(range(4, 10))) range_tracker = source.get_range_tracker(None, None) self.assertEqual(range_tracker.position_at_fraction(0), (0, 0)) self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8)) @@ -176,10 +176,11 @@ def test_single_source(self): read_all = source_test_utils.read_from_source range10 = RangeSource(0, 10) - self.assertEquals(read_all(ConcatSource([range10])), range(10)) - self.assertEquals(read_all(ConcatSource([range10]), (0, 5)), range(5, 10)) + self.assertEquals(read_all(ConcatSource([range10])), list(range(10))) + self.assertEquals(read_all(ConcatSource([range10]), (0, 5)), + list(range(5, 10))) self.assertEquals(read_all(ConcatSource([range10]), None, (0, 5)), - range(5)) + list(range(5))) def test_source_with_empty_ranges(self): read_all = source_test_utils.read_from_source @@ -189,11 +190,11 @@ def test_source_with_empty_ranges(self): range10 = RangeSource(0, 10) self.assertEquals(read_all(ConcatSource([empty, empty, range10])), - range(10)) + list(range(10))) self.assertEquals(read_all(ConcatSource([empty, range10, empty])), - range(10)) + list(range(10))) self.assertEquals(read_all(ConcatSource([range10, empty, range10, empty])), - range(10) + range(10)) + list(range(10)) + list(range(10))) def test_source_with_empty_ranges_exhastive(self): empty = RangeSource(0, 0) @@ -214,7 +215,7 @@ def test_run_concat_direct(self): ]) pipeline = TestPipeline() pcoll = pipeline | beam.io.Read(source) - assert_that(pcoll, equal_to(range(1000))) + assert_that(pcoll, equal_to(list(range(1000)))) pipeline.run() diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py index 5b3bbf200a14..ac3e43826fb2 100644 --- a/sdks/python/apache_beam/io/filebasedsink.py +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -25,6 +25,8 @@ import time import uuid +import six + from apache_beam.internal import util from apache_beam.io import iobase from apache_beam.io.filesystem import BeamIOError @@ -73,10 +75,10 @@ def __init__(self, ~exceptions.ValueError: if **shard_name_template** is not of expected format. """ - if not isinstance(file_path_prefix, (basestring, ValueProvider)): + if not isinstance(file_path_prefix, (six.string_types, ValueProvider)): raise TypeError('file_path_prefix must be a string or ValueProvider;' 'got %r instead' % file_path_prefix) - if not isinstance(file_name_suffix, (basestring, ValueProvider)): + if not isinstance(file_name_suffix, (six.string_types, ValueProvider)): raise TypeError('file_name_suffix must be a string or ValueProvider;' 'got %r instead' % file_name_suffix) @@ -87,9 +89,9 @@ def __init__(self, shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE elif shard_name_template == '': num_shards = 1 - if isinstance(file_path_prefix, basestring): + if isinstance(file_path_prefix, six.string_types): file_path_prefix = StaticValueProvider(str, file_path_prefix) - if isinstance(file_name_suffix, basestring): + if isinstance(file_name_suffix, six.string_types): file_name_suffix = StaticValueProvider(str, file_name_suffix) self.file_path_prefix = file_path_prefix self.file_name_suffix = file_name_suffix @@ -221,7 +223,7 @@ def _rename_batch(batch): except BeamIOError as exp: if exp.exception_details is None: raise - for (src, dest), exception in exp.exception_details.iteritems(): + for (src, dest), exception in exp.exception_details.items(): if exception: logging.warning('Rename not successful: %s -> %s, %s', src, dest, exception) @@ -243,7 +245,7 @@ def _rename_batch(batch): return exceptions exception_batches = util.run_using_threadpool( - _rename_batch, zip(source_file_batch, destination_file_batch), + _rename_batch, list(zip(source_file_batch, destination_file_batch)), num_threads) all_exceptions = [e for exception_batch in exception_batches diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index a80896c78181..e757fe242ec5 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -26,7 +26,9 @@ :class:`~apache_beam.io._AvroSource`. """ -from six import integer_types +from __future__ import absolute_import + +import six from apache_beam.internal import pickler from apache_beam.io import concat_source @@ -98,12 +100,12 @@ def __init__(self, result. """ - if not isinstance(file_pattern, (basestring, ValueProvider)): + if not isinstance(file_pattern, (six.string_types, ValueProvider)): raise TypeError('%s: file_pattern must be of type string' ' or ValueProvider; got %r instead' % (self.__class__.__name__, file_pattern)) - if isinstance(file_pattern, basestring): + if isinstance(file_pattern, six.string_types): file_pattern = StaticValueProvider(str, file_pattern) self._pattern = file_pattern @@ -234,11 +236,11 @@ class _SingleFileSource(iobase.BoundedSource): def __init__(self, file_based_source, file_name, start_offset, stop_offset, min_bundle_size=0, splittable=True): - if not isinstance(start_offset, integer_types): + if not isinstance(start_offset, six.integer_types): raise TypeError( 'start_offset must be a number. Received: %r' % start_offset) if stop_offset != range_trackers.OffsetRangeTracker.OFFSET_INFINITY: - if not isinstance(stop_offset, integer_types): + if not isinstance(stop_offset, six.integer_types): raise TypeError( 'stop_offset must be a number. Received: %r' % stop_offset) if start_offset >= stop_offset: diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index 0110c3f683c8..5cb6b1d06656 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -183,16 +183,16 @@ def setUp(self): filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2 def test_read(self): - sources = [TestConcatSource.DummySource(range(start, start + 10)) for start - in [0, 10, 20]] + sources = [TestConcatSource.DummySource(range(start, start + 10)) + for start in [0, 10, 20]] concat = ConcatSource(sources) range_tracker = concat.get_range_tracker(None, None) read_data = [value for value in concat.read(range_tracker)] - self.assertItemsEqual(range(30), read_data) + self.assertItemsEqual(list(range(30)), read_data) def test_split(self): - sources = [TestConcatSource.DummySource(range(start, start + 10)) for start - in [0, 10, 20]] + sources = [TestConcatSource.DummySource(range(start, start + 10)) + for start in [0, 10, 20]] concat = ConcatSource(sources) splits = [split for split in concat.split()] self.assertEquals(6, len(splits)) @@ -205,11 +205,11 @@ def test_split(self): split.stop_position) read_data.extend([value for value in split.source.read( range_tracker_for_split)]) - self.assertItemsEqual(range(30), read_data) + self.assertItemsEqual(list(range(30)), read_data) def test_estimate_size(self): - sources = [TestConcatSource.DummySource(range(start, start + 10)) for start - in [0, 10, 20]] + sources = [TestConcatSource.DummySource(range(start, start + 10)) + for start in [0, 10, 20]] concat = ConcatSource(sources) self.assertEquals(30, concat.estimate_size()) diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 09739dc94454..87bba7e8deac 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -20,16 +20,20 @@ import abc import bz2 -import cStringIO import logging import os import time import zlib -from six import integer_types +import six from apache_beam.utils.plugin import BeamPlugin +try: + from cStringIO import StringIO as BytesIO +except ImportError: + from io import BytesIO + logger = logging.getLogger(__name__) DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 @@ -82,7 +86,7 @@ def detect_compression_type(cls, file_path): """Returns the compression type of a file (based on its suffix).""" compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP} lowercased_path = file_path.lower() - for suffix, compression_type in compression_types_by_suffix.iteritems(): + for suffix, compression_type in compression_types_by_suffix.items(): if lowercased_path.endswith(suffix): return compression_type return cls.UNCOMPRESSED @@ -122,7 +126,7 @@ def __init__(self, if self.readable(): self._read_size = read_size - self._read_buffer = cStringIO.StringIO() + self._read_buffer = BytesIO() self._read_position = 0 self._read_eof = False @@ -237,7 +241,7 @@ def readline(self): if not self._decompressor: raise ValueError('decompressor not initialized') - io = cStringIO.StringIO() + output_stream = BytesIO() while True: # Ensure that the internal buffer has at least half the read_size. Going # with half the _read_size (as opposed to a full _read_size) to ensure @@ -246,11 +250,11 @@ def readline(self): self._fetch_to_internal_buffer(self._read_size / 2) line = self._read_from_internal_buffer( lambda: self._read_buffer.readline()) - io.write(line) + output_stream.write(line) if line.endswith('\n') or not line: break # Newline or EOF reached. - return io.getvalue() + return output_stream.getvalue() def closed(self): return not self._file or self._file.closed() @@ -373,10 +377,12 @@ class FileMetadata(object): """Metadata about a file path that is the output of FileSystem.match """ def __init__(self, path, size_in_bytes): - assert isinstance(path, basestring) and path, "Path should be a string" - assert isinstance(size_in_bytes, integer_types) and size_in_bytes >= 0, \ - "Invalid value for size_in_bytes should %s (of type %s)" % ( - size_in_bytes, type(size_in_bytes)) + assert isinstance(path, six.string_types) and path, \ + "Path should be a string" + assert (isinstance(size_in_bytes, six.integer_types) + and size_in_bytes >= 0), \ + ("Invalid value for size_in_bytes should %s (of type %s)" + % (size_in_bytes, type(size_in_bytes))) self.path = path self.size_in_bytes = size_in_bytes @@ -423,14 +429,13 @@ def __init__(self, msg, exception_details=None): self.exception_details = exception_details -class FileSystem(BeamPlugin): +class FileSystem(six.with_metaclass(abc.ABCMeta, BeamPlugin)): """A class that defines the functions that can be performed on a filesystem. All methods are abstract and they are for file system providers to implement. Clients should use the FileSystems class to interact with the correct file system based on the provided file pattern scheme. """ - __metaclass__ = abc.ABCMeta CHUNK_SIZE = 1 # Chuck size in the batch operations def __init__(self, pipeline_options): diff --git a/sdks/python/apache_beam/io/filesystemio.py b/sdks/python/apache_beam/io/filesystemio.py index 35e141bb7566..1572c6d41419 100644 --- a/sdks/python/apache_beam/io/filesystemio.py +++ b/sdks/python/apache_beam/io/filesystemio.py @@ -16,22 +16,24 @@ # """Utilities for ``FileSystem`` implementations.""" +from __future__ import absolute_import + import abc import io import os +import six + __all__ = ['Downloader', 'Uploader', 'DownloaderStream', 'UploaderStream', 'PipeStream'] -class Downloader(object): +class Downloader(six.with_metaclass(abc.ABCMeta, object)): """Download interface for a single file. Implementations should support random access reads. """ - __metaclass__ = abc.ABCMeta - @abc.abstractproperty def size(self): """Size of file to download.""" @@ -52,11 +54,9 @@ def get_range(self, start, end): """ -class Uploader(object): +class Uploader(six.with_metaclass(abc.ABCMeta, object)): """Upload interface for a single file.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod def put(self, data): """Write data to file sequentially. diff --git a/sdks/python/apache_beam/io/filesystems_test.py b/sdks/python/apache_beam/io/filesystems_test.py index d8cc97115508..d936e313af38 100644 --- a/sdks/python/apache_beam/io/filesystems_test.py +++ b/sdks/python/apache_beam/io/filesystems_test.py @@ -123,7 +123,7 @@ def test_match_file_exception(self): with self.assertRaisesRegexp(BeamIOError, r'^Unable to get the Filesystem') as error: FileSystems.match([None]) - self.assertEqual(error.exception.exception_details.keys(), [None]) + self.assertEqual(list(error.exception.exception_details.keys()), [None]) def test_match_directory(self): path1 = os.path.join(self.tmpdir, 'f1') @@ -157,7 +157,8 @@ def test_copy_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Copy operation failed') as error: FileSystems.copy([path1], [path2]) - self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + self.assertEqual(list(error.exception.exception_details.keys()), + [(path1, path2)]) def test_copy_directory(self): path_t1 = os.path.join(self.tmpdir, 't1') @@ -189,7 +190,8 @@ def test_rename_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Rename operation failed') as error: FileSystems.rename([path1], [path2]) - self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + self.assertEqual(list(error.exception.exception_details.keys()), + [(path1, path2)]) def test_rename_directory(self): path_t1 = os.path.join(self.tmpdir, 't1') @@ -230,4 +232,4 @@ def test_delete_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Delete operation failed') as error: FileSystems.delete([path1]) - self.assertEqual(error.exception.exception_details.keys(), [path1]) + self.assertEqual(list(error.exception.exception_details.keys()), [path1]) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index a79ad5e39859..109cecafc540 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -120,6 +120,8 @@ import time import uuid +import six + from apache_beam import coders from apache_beam.internal.gcp import auth from apache_beam.internal.gcp.json_value import from_json_value @@ -210,7 +212,7 @@ def decode(self, encoded_table_row): od = json.loads( encoded_table_row, object_pairs_hook=collections.OrderedDict) return bigquery.TableRow( - f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()]) + f=[bigquery.TableCell(v=to_json_value(e)) for e in six.itervalues(od)]) def parse_table_schema_from_json(schema_string): @@ -522,7 +524,7 @@ def __init__(self, table, dataset=None, project=None, schema=None, self.table_reference = _parse_table_reference(table, dataset, project) # Transform the table schema into a bigquery.TableSchema instance. - if isinstance(schema, basestring): + if isinstance(schema, six.string_types): # TODO(silviuc): Should add a regex-based validation of the format. table_schema = bigquery.TableSchema() schema_list = [s.strip(' ') for s in schema.split(',')] @@ -1101,7 +1103,7 @@ def insert_rows(self, project_id, dataset_id, table_id, rows): final_rows = [] for row in rows: json_object = bigquery.JsonObject() - for k, v in row.iteritems(): + for k, v in six.iteritems(row): json_object.additionalProperties.append( bigquery.JsonObject.AdditionalProperty( key=k, value=to_json_value(v))) @@ -1413,7 +1415,7 @@ def get_dict_table_schema(schema): return schema elif schema is None: return schema - elif isinstance(schema, basestring): + elif isinstance(schema, six.string_types): table_schema = WriteToBigQuery.get_table_schema_from_string(schema) return WriteToBigQuery.table_schema_to_dict(table_schema) elif isinstance(schema, bigquery.TableSchema): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index ff6721e6d91a..2a4c31f5be74 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -698,7 +698,7 @@ def test_rows_are_written(self): sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14} expected_rows = [] json_object = bigquery.JsonObject() - for k, v in sample_row.iteritems(): + for k, v in sample_row.items(): json_object.additionalProperties.append( bigquery.JsonObject.AdditionalProperty( key=k, value=to_json_value(v))) diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py index e131f93d5207..5b7d56ae9244 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py @@ -179,7 +179,8 @@ def check_DatastoreWriteFn(self, num_entities): entities = [e.entity for e in fake_datastore.create_entities(num_entities)] - expected_mutations = map(WriteToDatastore.to_upsert_mutation, entities) + expected_mutations = list(map(WriteToDatastore.to_upsert_mutation, + entities)) actual_mutations = [] self._mock_datastore.commit.side_effect = ( @@ -217,7 +218,7 @@ def test_DatastoreWriteLargeEntities(self): def verify_unique_keys(self, queries): """A helper function that verifies if all the queries have unique keys.""" - keys, _ = zip(*queries) + keys, _ = list(zip(*queries)) keys = set(keys) self.assertEqual(len(keys), len(queries)) diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index b86a2fa01455..45472e64541b 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -20,12 +20,16 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + import errno import logging import sys import time from socket import error as SocketError +import six + # pylint: disable=ungrouped-imports from apache_beam.internal.gcp import auth from apache_beam.utils import retry @@ -251,11 +255,14 @@ def make_kind_stats_query(namespace, kind, latest_timestamp): else: kind_stat_query.kind.add().name = '__Stat_Ns_Kind__' - kind_filter = datastore_helper.set_property_filter( - query_pb2.Filter(), 'kind_name', PropertyFilter.EQUAL, unicode(kind)) - timestamp_filter = datastore_helper.set_property_filter( - query_pb2.Filter(), 'timestamp', PropertyFilter.EQUAL, - latest_timestamp) + kind_filter = datastore_helper.set_property_filter(query_pb2.Filter(), + 'kind_name', + PropertyFilter.EQUAL, + six.text_type(kind)) + timestamp_filter = datastore_helper.set_property_filter(query_pb2.Filter(), + 'timestamp', + PropertyFilter.EQUAL, + latest_timestamp) datastore_helper.set_composite_filter(kind_stat_query.filter, CompositeFilter.AND, kind_filter, diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index ce8b5e6e4244..8c4aa7cc9694 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -18,6 +18,8 @@ from __future__ import absolute_import +import six + from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystem import CompressedFile from apache_beam.io.filesystem import CompressionTypes @@ -123,7 +125,7 @@ def _match(pattern, limit): pattern += '*' file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern, limit) metadata_list = [FileMetadata(path, size) - for path, size in file_sizes.iteritems()] + for path, size in six.iteritems(file_sizes)] return MatchResult(pattern, metadata_list) exceptions = {} diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 3bdf2e64ca2e..c39ad6225093 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -20,7 +20,6 @@ https://github.com/GoogleCloudPlatform/appengine-gcs-client. """ -import cStringIO import errno import fnmatch import io @@ -41,6 +40,13 @@ from apache_beam.io.filesystemio import UploaderStream from apache_beam.utils import retry +# pylint: disable=ungrouped-imports +try: + from cStringIO import StringIO as BytesIO +except ImportError: + from io import BytesIO +# pylint: enable=ungrouped-imports + __all__ = ['GcsIO'] @@ -490,7 +496,7 @@ def __init__(self, client, path, buffer_size): self._get_request.generation = metadata.generation # Initialize read buffer state. - self._download_stream = cStringIO.StringIO() + self._download_stream = BytesIO() self._downloader = transfer.Download( self._download_stream, auto_transfer=False, chunksize=self._buffer_size) self._client.objects.Get(self._get_request, download=self._downloader) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 98aa884c71dc..f67fde67a8f5 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -26,6 +26,8 @@ import re +import six + from apache_beam import coders from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write @@ -72,7 +74,7 @@ def expand(self, pvalue): pcoll = pvalue.pipeline | Read(self._source) pcoll.element_type = bytes pcoll = pcoll | 'DecodeString' >> Map(lambda b: b.decode('utf-8')) - pcoll.element_type = unicode + pcoll.element_type = six.text_type return pcoll diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 8bd9fa4f41aa..848c4b6cc3b5 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -17,10 +17,13 @@ """Unit tests for PubSub sources and sinks.""" +from __future__ import absolute_import + import logging import unittest import hamcrest as hc +import six import apache_beam as beam from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub @@ -52,7 +55,7 @@ def test_expand_with_topic(self): None, 'a_label') | beam.Map(lambda x: x)) # Ensure that the output type is str. - self.assertEqual(unicode, pcoll.element_type) + self.assertEqual(six.text_type, pcoll.element_type) # Apply the necessary PTransformOverrides. overrides = _get_transform_overrides(p.options) @@ -76,7 +79,7 @@ def test_expand_with_subscription(self): 'a_label') | beam.Map(lambda x: x)) # Ensure that the output type is str - self.assertEqual(unicode, pcoll.element_type) + self.assertEqual(six.text_type, pcoll.element_type) # Apply the necessary PTransformOverrides. overrides = _get_transform_overrides(p.options) diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py index 2ba1da26468b..fbfd6439fd17 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py @@ -116,7 +116,7 @@ def list(self, path, status=False): raise ValueError('list must be called on a directory, got file: %s', path) result = [] - for file in self.files.itervalues(): + for file in self.files.values(): if file.stat['path'].startswith(path): fs = file.get_file_status() result.append((fs[hdfs._FILE_STATUS_PATH_SUFFIX], fs)) diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py index 29b68f61c34d..205fef86c653 100644 --- a/sdks/python/apache_beam/io/localfilesystem_test.py +++ b/sdks/python/apache_beam/io/localfilesystem_test.py @@ -145,7 +145,7 @@ def test_match_file_exception(self): with self.assertRaisesRegexp(BeamIOError, r'^Match operation failed') as error: self.fs.match([None]) - self.assertEqual(error.exception.exception_details.keys(), [None]) + self.assertEqual(list(error.exception.exception_details.keys()), [None]) def test_match_glob(self): path1 = os.path.join(self.tmpdir, 'f1') @@ -179,7 +179,8 @@ def test_copy_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Copy operation failed') as error: self.fs.copy([path1], [path2]) - self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + self.assertEqual(list(error.exception.exception_details.keys()), + [(path1, path2)]) def test_copy_directory(self): path_t1 = os.path.join(self.tmpdir, 't1') @@ -211,7 +212,8 @@ def test_rename_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Rename operation failed') as error: self.fs.rename([path1], [path2]) - self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + self.assertEqual(list(error.exception.exception_details.keys()), + [(path1, path2)]) def test_rename_directory(self): path_t1 = os.path.join(self.tmpdir, 't1') @@ -252,4 +254,4 @@ def test_delete_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Delete operation failed') as error: self.fs.delete([path1]) - self.assertEqual(error.exception.exception_details.keys(), [path1]) + self.assertEqual(list(error.exception.exception_details.keys()), [path1]) diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py index c540cfae0502..dc1dff604caa 100644 --- a/sdks/python/apache_beam/io/tfrecordio_test.py +++ b/sdks/python/apache_beam/io/tfrecordio_test.py @@ -445,7 +445,7 @@ def test_end2end_example_proto(self): file_path_prefix = temp_dir.create_temp_file('result') example = tf.train.Example() - example.features.feature['int'].int64_list.value.extend(range(3)) + example.features.feature['int'].int64_list.value.extend(list(range(3))) example.features.feature['bytes'].bytes_list.value.extend( [b'foo', b'bar']) diff --git a/sdks/python/apache_beam/io/vcfio.py b/sdks/python/apache_beam/io/vcfio.py index a0206d450762..5b85026209f8 100644 --- a/sdks/python/apache_beam/io/vcfio.py +++ b/sdks/python/apache_beam/io/vcfio.py @@ -26,6 +26,7 @@ import traceback from collections import namedtuple +import six import vcf from apache_beam.coders import coders @@ -369,7 +370,7 @@ def _convert_to_variant_record(self, record, infos, formats): if record.FILTER is not None: variant.filters.extend( record.FILTER if record.FILTER else [PASS_FILTER]) - for k, v in record.INFO.iteritems(): + for k, v in six.iteritems(record.INFO): # Special case: END info value specifies end of the record, so adjust # variant.end and do not include it as part of variant.info. if k == END_INFO_KEY: @@ -404,7 +405,7 @@ def _convert_to_variant_record(self, record, infos, formats): # Note: this is already done for INFO fields in PyVCF. if (field in formats and formats[field].num is None and - isinstance(data, (int, float, long, basestring, bool))): + isinstance(data, (int, float, long, six.string_types, bool))): data = [data] call.info[field] = data variant.calls.append(call) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 7a2cd4bf1e40..2ebf5c05e27d 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -175,7 +175,7 @@ def from_dictionary(cls, options): A PipelineOptions object representing the given arguments. """ flags = [] - for k, v in options.iteritems(): + for k, v in options.items(): if isinstance(v, bool): if v: flags.append('--%s' % k) @@ -233,7 +233,7 @@ def _visible_option_list(self): for option in dir(self._visible_options) if option[0] != '_') def __dir__(self): - return sorted(dir(type(self)) + self.__dict__.keys() + + return sorted(dir(type(self)) + list(self.__dict__.keys()) + self._visible_option_list()) def __getattr__(self, name): diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 71d97ba5d21f..f0e41290cca0 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -53,6 +53,8 @@ import shutil import tempfile +import six + from apache_beam import pvalue from apache_beam.internal import pickler from apache_beam.io.filesystems import FileSystems @@ -859,7 +861,7 @@ def is_side_input(tag): return result -class PTransformOverride(object): +class PTransformOverride(six.with_metaclass(abc.ABCMeta, object)): """For internal use only; no backwards-compatibility guarantees. Gives a matcher and replacements for matching PTransforms. @@ -867,7 +869,6 @@ class PTransformOverride(object): TODO: Update this to support cases where input and/our output types are different. """ - __metaclass__ = abc.ABCMeta @abc.abstractmethod def matches(self, applied_ptransform): diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 236c14bcbc60..bdb1bf3a2f10 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -29,6 +29,8 @@ import collections import itertools +import six + from apache_beam import coders from apache_beam import typehints from apache_beam.internal import pickler @@ -259,7 +261,7 @@ class TaggedOutput(object): """ def __init__(self, tag, value): - if not isinstance(tag, basestring): + if not isinstance(tag, six.string_types): raise TypeError( 'Attempting to create a TaggedOutput with non-string tag %s' % tag) self.tag = tag diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index d5ca68307aa5..df30db4a15b4 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -22,9 +22,13 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + import sys import traceback +import six + from apache_beam.internal import util from apache_beam.metrics.execution import ScopedMetricsContainer from apache_beam.pvalue import TaggedOutput @@ -512,7 +516,7 @@ def _reraise_augmented(self, exn): traceback.format_exception_only(type(exn), exn)[-1].strip() + step_annotation) new_exn._tagged_with_step = True - raise new_exn, None, original_traceback + six.reraise(new_exn, None, original_traceback) class OutputProcessor(object): @@ -549,7 +553,7 @@ def process_outputs(self, windowed_input_element, results): tag = None if isinstance(result, TaggedOutput): tag = result.tag - if not isinstance(tag, basestring): + if not isinstance(tag, six.string_types): raise TypeError('In %s, tag %s is not a string' % (self, tag)) result = result.value if isinstance(result, WindowedValue): @@ -591,7 +595,7 @@ def finish_bundle_outputs(self, results): tag = None if isinstance(result, TaggedOutput): tag = result.tag - if not isinstance(tag, basestring): + if not isinstance(tag, six.string_types): raise TypeError('In %s, tag %s is not a string' % (self, tag)) result = result.value diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py index 2e0bc8209ecf..fe5c67cc9775 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py @@ -145,7 +145,7 @@ def _populate_metric_results(self, response): # Now we create the MetricResult elements. result = [] - for metric_key, metric in metrics_by_name.iteritems(): + for metric_key, metric in metrics_by_name.items(): attempted = self._get_metric_value(metric['tentative']) committed = self._get_metric_value(metric['committed']) if attempted is None or committed is None: diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py index dd3cbe1156a4..79738700269b 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py @@ -34,7 +34,7 @@ class DictToObject(object): """Translate from a dict(list()) structure to an object structure""" def __init__(self, data): - for name, value in data.iteritems(): + for name, value in data.items(): setattr(self, name, self._wrap(value)) def _wrap(self, value): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index bfec89310e9e..fbbfb97d036b 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -25,7 +25,6 @@ import threading import time import traceback -import urllib from collections import defaultdict import apache_beam as beam @@ -51,6 +50,8 @@ from apache_beam.transforms.display import DisplayData from apache_beam.typehints import typehints from apache_beam.utils.plugin import BeamPlugin +from six.moves.urllib.parse import quote +from six.moves.urllib.parse import unquote __all__ = ['DataflowRunner'] @@ -883,12 +884,12 @@ def deserialize_windowing_strategy(cls, serialized_data): @staticmethod def byte_array_to_json_string(raw_bytes): """Implements org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString.""" - return urllib.quote(raw_bytes) + return quote(raw_bytes) @staticmethod def json_string_to_byte_array(encoded_string): """Implements org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray.""" - return urllib.unquote(encoded_string) + return unquote(encoded_string) class DataflowPipelineResult(PipelineResult): diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 1cf80b799021..018668952793 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -19,6 +19,8 @@ Dataflow client utility functions.""" +from __future__ import absolute_import + import codecs import getpass import json @@ -29,6 +31,7 @@ from datetime import datetime from StringIO import StringIO +import six from apitools.base.py import encoding from apitools.base.py import exceptions @@ -251,7 +254,7 @@ def __init__(self, packages, options, environment_version, pipeline_url): dataflow.Environment.SdkPipelineOptionsValue()) options_dict = {k: v - for k, v in sdk_pipeline_options.iteritems() + for k, v in sdk_pipeline_options.items() if v is not None} options_dict["pipelineUrl"] = pipeline_url self.proto.sdkPipelineOptions.additionalProperties.append( @@ -287,7 +290,7 @@ def encode_shortstrings(input_buffer, errors='strict'): def decode_shortstrings(input_buffer, errors='strict'): """Decoder (to Unicode) that suppresses long base64 strings.""" shortened, length = encode_shortstrings(input_buffer, errors) - return unicode(shortened), length + return six.text_type(shortened), length def shortstrings_registerer(encoding_name): if encoding_name == 'shortstrings': diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py index 805473a8838c..35c6ed242984 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py @@ -49,7 +49,7 @@ def _matches(self, item): if self.origin != IGNORED and item.origin != self.origin: return False if self.context != IGNORED: - for key, name in self.context.iteritems(): + for key, name in self.context.items(): if key not in item.context: return False if name != IGNORED and item.context[key] != name: diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py index 2f2316f6f1d0..cba7b22d5e9f 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py @@ -31,7 +31,7 @@ def _dict_printable_fields(dict_object, skip_fields): """Returns a list of strings for the interesting fields of a dict.""" return ['%s=%r' % (name, value) - for name, value in dict_object.iteritems() + for name, value in dict_object.items() # want to output value 0 but not None nor [] if (value or value == 0) and name not in skip_fields] diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 46176c9e969e..1f0bdbfa4a1f 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -231,7 +231,7 @@ def handle_result( # Commit partial GBK states existing_keyed_state = self._transform_keyed_states[result.transform] - for k, v in result.partial_keyed_state.iteritems(): + for k, v in result.partial_keyed_state.items(): existing_keyed_state[k] = v return committed_bundles diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 34c12c345a0e..0c18a9d35afe 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -22,12 +22,14 @@ import collections import itertools import logging -import Queue import sys import threading import traceback from weakref import WeakValueDictionary +import six + +import six.moves.queue as queue from apache_beam.metrics.execution import MetricsContainer from apache_beam.metrics.execution import ScopedMetricsContainer @@ -76,7 +78,7 @@ def _get_task_or_none(self): # shutdown. return self.queue.get( timeout=_ExecutorService._ExecutorServiceWorker.TIMEOUT) - except Queue.Empty: + except queue.Empty: return None def run(self): @@ -95,7 +97,7 @@ def shutdown(self): self.shutdown_requested = True def __init__(self, num_workers): - self.queue = Queue.Queue() + self.queue = queue.Queue() self.workers = [_ExecutorService._ExecutorServiceWorker( self.queue, i) for i in range(num_workers)] self.shutdown_requested = False @@ -120,7 +122,7 @@ def shutdown(self): try: self.queue.get_nowait() self.queue.task_done() - except Queue.Empty: + except queue.Empty: continue # All existing threads will eventually terminate (after they complete their # last task). @@ -398,7 +400,7 @@ def await_completion(self): try: if update.exception: t, v, tb = update.exc_info - raise t, v, tb + six.reraise(t, v, tb) finally: self.executor_service.shutdown() self.executor_service.await_completion() @@ -438,14 +440,14 @@ class _TypedUpdateQueue(object): def __init__(self, item_type): self._item_type = item_type - self._queue = Queue.Queue() + self._queue = queue.Queue() def poll(self): try: item = self._queue.get_nowait() self._queue.task_done() return item - except Queue.Empty: + except queue.Empty: return None def take(self): @@ -458,7 +460,7 @@ def take(self): item = self._queue.get(timeout=1) self._queue.task_done() return item - except Queue.Empty: + except queue.Empty: pass def offer(self, item): diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index cbd2c9fbc254..cd5e533ffb38 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -563,7 +563,7 @@ def process_element(self, element): def finish_bundle(self): self.runner.finish() - bundles = self._tagged_receivers.values() + bundles = list(self._tagged_receivers.values()) result_counters = self._counter_factory.get_counters() return TransformResult( self, bundles, [], result_counters, None, @@ -705,7 +705,7 @@ def process_element(self, element): def finish_bundle(self): bundles = [] bundle = None - for encoded_k, vs in self.gbk_items.iteritems(): + for encoded_k, vs in self.gbk_items.items(): if not bundle: bundle = self._evaluation_context.create_bundle( self.output_pcollection) diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 084073f4fe71..3634cdd448cc 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -194,7 +194,7 @@ def output_watermark(self): def hold(self, keyed_earliest_holds): with self._lock: - for key, hold_value in keyed_earliest_holds.iteritems(): + for key, hold_value in keyed_earliest_holds.items(): self._keyed_earliest_holds[key] = hold_value if (hold_value is None or hold_value == WatermarkManager.WATERMARK_POS_INF): @@ -256,7 +256,7 @@ def extract_transform_timers(self): with self._lock: fired_timers = [] has_realtime_timer = False - for encoded_key, state in self._keyed_states.iteritems(): + for encoded_key, state in self._keyed_states.items(): timers, had_realtime_timer = state.get_timers( watermark=self._input_watermark, processing_time=self._clock.time()) diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py index 84bed4270bc7..4dcd074d92bf 100644 --- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py @@ -59,7 +59,7 @@ def run(self, pipeline): # Submit the job to the RPC co-process jobName = ('Job-' + ''.join(random.choice(string.ascii_uppercase) for _ in range(6))) - options = {k: v for k, v in pipeline._options.get_all_options().iteritems() + options = {k: v for k, v in pipeline._options.get_all_options().items() if v is not None} try: diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index c36bae7114a6..e0fadd892fea 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -20,7 +20,6 @@ import collections import copy import logging -import Queue as queue import re import threading import time @@ -28,7 +27,9 @@ import grpc -import apache_beam as beam # pylint: disable=ungrouped-imports +# pylint: disable=ungrouped-imports,wrong-import-order +import apache_beam as beam +import six.moves.queue as queue from apache_beam import coders from apache_beam import metrics from apache_beam.coders import WindowedValueCoder @@ -50,6 +51,9 @@ from apache_beam.transforms.window import GlobalWindows from apache_beam.utils import proto_utils +# pylint: enable=ungrouped-imports,wrong-import-order + + # This module is experimental. No backwards-compatibility guarantees. @@ -1162,7 +1166,7 @@ def run(self): self._latest_progress = progress_result.process_bundle_progress if self._callback: self._callback(self._latest_progress) - except Exception, exn: + except Exception as exn: logging.error("Bad progress: %s", exn) time.sleep(self._frequency) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index e27da621e485..cb56f3d2b57c 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import +from __future__ import print_function + import functools import logging import time @@ -66,17 +69,17 @@ def cross_product(elem, sides): res, equal_to([ # The window [0, 5) maps to the window [0, 7). - (0, range(7)), - (1, range(7)), - (2, range(7)), - (3, range(7)), - (4, range(7)), + (0, list(range(7))), + (1, list(range(7))), + (2, list(range(7))), + (3, list(range(7))), + (4, list(range(7))), # The window [5, 10) maps to the window [7, 14). - (5, range(7, 10)), - (6, range(7, 10)), - (7, range(7, 10)), - (8, range(7, 10)), - (9, range(7, 10))]), + (5, list(range(7, 10))), + (6, list(range(7, 10))), + (7, list(range(7, 10))), + (8, list(range(7, 10))), + (9, list(range(7, 10)))]), label='windowed') def test_flattened_side_input(self): @@ -218,7 +221,7 @@ def test_progress_metrics(self): m_out.processed_elements.measured.output_element_counts['twice']) except: - print res._metrics_by_stage + print(res._metrics_by_stage) raise # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py index 74c6b03f5a66..df2c38e30eba 100644 --- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py +++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py @@ -391,7 +391,7 @@ def append(self, pair): def freeze(self): if not self.frozen: self._encoded_elements = [self.grouped_coder.encode(kv) - for kv in self.elements.iteritems()] + for kv in self.elements.items()] self.frozen = True return self._encoded_elements diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py index a631a0c847bf..32fec0ca6a31 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py @@ -18,7 +18,6 @@ import functools import logging import os -import Queue as queue import socket import subprocess import sys @@ -31,6 +30,7 @@ import grpc from google.protobuf import text_format +import six.moves.queue as queue from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.portability.api import beam_job_api_pb2 from apache_beam.portability.api import beam_job_api_pb2_grpc diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py index 1fc244b20f20..1df3f809cd64 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import print_function + import logging import platform import signal @@ -41,13 +43,13 @@ def setUp(self): if platform.system() != 'Windows': def handler(signum, frame): msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS - print '=' * 20, msg, '=' * 20 + print('=' * 20, msg, '=' * 20) traceback.print_stack(frame) threads_by_id = {th.ident: th for th in threading.enumerate()} for thread_id, stack in sys._current_frames().items(): th = threads_by_id.get(thread_id) - print - print '# Thread:', th or thread_id + print() + print('# Thread:', th or thread_id) traceback.print_stack(stack) raise BaseException(msg) signal.signal(signal.SIGALRM, handler) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 2e853b072ca9..5520890cf396 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -26,6 +26,8 @@ import json import logging +import six + import apache_beam as beam from apache_beam.coders import WindowedValueCoder from apache_beam.coders import coder_impl @@ -101,7 +103,7 @@ def __init__(self, operation_name, step_name, consumers, counter_factory, # We must do this manually as we don't have a spec or spec.output_coders. self.receivers = [ operations.ConsumerSet(self.counter_factory, self.step_name, 0, - next(consumers.itervalues()), + next(six.itervalues(consumers)), self.windowed_coder)] def process(self, windowed_value): @@ -281,7 +283,7 @@ def process_bundle(self, instruction_id): try: self.state_sampler.start() # Start all operations. - for op in reversed(self.ops.values()): + for op in reversed(list(self.ops.values())): logging.info('start %s', op) op.start() diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 2e4f2d6f69a7..d6e5d54b2f11 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -24,12 +24,13 @@ import abc import collections import logging -import Queue as queue import sys import threading import grpc +import six +import six.moves.queue as queue from apache_beam.coders import coder_impl from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc @@ -49,7 +50,7 @@ def close(self): self._close_callback(self.get()) -class DataChannel(object): +class DataChannel(six.with_metaclass(abc.ABCMeta, object)): """Represents a channel for reading and writing data over the data plane. Read from this channel with the input_elements method:: @@ -68,8 +69,6 @@ class DataChannel(object): data_channel.close() """ - __metaclass__ = abc.ABCMeta - @abc.abstractmethod def input_elements(self, instruction_id, expected_targets): """Returns an iterable of all Element.Data bundles for instruction_id. @@ -182,7 +181,7 @@ def input_elements(self, instruction_id, expected_targets): data = received.get(timeout=1) except queue.Empty: if self._exc_info: - raise self.exc_info[0], self.exc_info[1], self.exc_info[2] + six.reraise(self.exc_info[0], self.exc_info[1], self.exc_info[2]) else: if not data.data and data.target in expected_targets: done_targets.append(data.target) @@ -270,11 +269,9 @@ def Data(self, elements_iterator, context): yield elements -class DataChannelFactory(object): +class DataChannelFactory(six.with_metaclass(abc.ABCMeta, object)): """An abstract factory for creating ``DataChannel``.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod def create_data_channel(self, remote_grpc_port): """Returns a ``DataChannel`` from the given RemoteGrpcPort.""" diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py b/sdks/python/apache_beam/runners/worker/data_plane_test.py index 07ba8fd44f1f..b2cefbe1469c 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane_test.py +++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py @@ -28,6 +28,7 @@ from concurrent import futures import grpc +import six from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc @@ -50,7 +51,7 @@ def call_fn(): thread.join(timeout_secs) if exc_info: t, v, tb = exc_info # pylint: disable=unbalanced-tuple-unpacking - raise t, v, tb + six.reraise(t, v, tb) assert not thread.is_alive(), 'timed out after %s seconds' % timeout_secs return wrapper return decorate diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py index 6d8a1d926713..4cb4b2336e4a 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler.py +++ b/sdks/python/apache_beam/runners/worker/log_handler.py @@ -16,13 +16,15 @@ # """Beam fn API log handler.""" +from __future__ import absolute_import + import logging import math -import Queue as queue import threading import grpc +import six.moves.queue as queue from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py b/sdks/python/apache_beam/runners/worker/log_handler_test.py index 647b8b7e8b4a..958f081ad87b 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler_test.py +++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py @@ -101,7 +101,7 @@ def _create_test(name, num_logs): lambda self: self._verify_fn_log_handler(num_logs)) -for test_name, num_logs_entries in data.iteritems(): +for test_name, num_logs_entries in data.items(): _create_test(test_name, num_logs_entries) diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py b/sdks/python/apache_beam/runners/worker/logger_test.py index cf3f69292826..a41547865ea5 100644 --- a/sdks/python/apache_beam/runners/worker/logger_test.py +++ b/sdks/python/apache_beam/runners/worker/logger_test.py @@ -83,7 +83,7 @@ def create_log_record(self, **kwargs): class Record(object): def __init__(self, **kwargs): - for k, v in kwargs.iteritems(): + for k, v in kwargs.items(): setattr(self, k, v) return Record(**kwargs) diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py b/sdks/python/apache_beam/runners/worker/operation_specs.py index bdafbeaf44ad..2bb1aa500ee8 100644 --- a/sdks/python/apache_beam/runners/worker/operation_specs.py +++ b/sdks/python/apache_beam/runners/worker/operation_specs.py @@ -55,7 +55,7 @@ def worker_printable_fields(workerproto): return ['%s=%s' % (name, value) # _asdict is the only way and cannot subclass this generated class # pylint: disable=protected-access - for name, value in workerproto._asdict().iteritems() + for name, value in workerproto._asdict().items() # want to output value 0 but not None nor [] if (value or value == 0) and name not in diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 11ff909f3e9f..3efe36ecd61e 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -19,10 +19,14 @@ """Worker operations executor.""" +from __future__ import absolute_import + import collections import itertools import logging +import six + from apache_beam import pvalue from apache_beam.internal import pickler from apache_beam.io import iobase @@ -529,7 +533,7 @@ def process(self, wkv): target = self.key_count * 9 // 10 old_wkeys = [] # TODO(robertwb): Use an LRU cache? - for old_wkey, old_wvalue in self.table.iteritems(): + for old_wkey, old_wvalue in six.iteritems(self.table): old_wkeys.append(old_wkey) # Can't mutate while iterating. self.output_key(old_wkey, old_wvalue[0]) self.key_count -= 1 @@ -544,7 +548,7 @@ def process(self, wkv): entry[0] = self.combine_fn_add_input(entry[0], value) def finish(self): - for wkey, value in self.table.iteritems(): + for wkey, value in six.iteritems(self.table): self.output_key(wkey, value[0]) self.table = {} self.key_count = 0 diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 2767530adb0b..360964e392a5 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -21,14 +21,15 @@ from __future__ import print_function import logging -import Queue as queue import sys import threading import traceback from concurrent import futures import grpc +import six +import six.moves.queue as queue from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.runners.worker import bundle_processor @@ -288,7 +289,7 @@ def _blocking_request(self, request): self._requests.put(request) while not future.wait(timeout=1): if self._exc_info: - raise self._exc_info[0], self._exc_info[1], self._exc_info[2] + six.reraise(self._exc_info[0], self._exc_info[1], self._exc_info[2]) elif self._done: raise RuntimeError() del self._responses_by_id[request.id] diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 46670e88964f..20d4366f81a4 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -16,7 +16,6 @@ # """SDK Fn Harness entry point.""" -import BaseHTTPServer import json import logging import os @@ -27,6 +26,7 @@ from google.protobuf import text_format +import six.moves.BaseHTTPServer from apache_beam.internal import pickler from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners.dataflow.internal import names @@ -57,7 +57,7 @@ def start(self, status_http_port=0): Default is 0 which means any free unsecured port """ - class StatusHttpHandler(BaseHTTPServer.BaseHTTPRequestHandler): + class StatusHttpHandler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler): """HTTP handler for serving stacktraces of all threads.""" def do_GET(self): # pylint: disable=invalid-name @@ -73,7 +73,7 @@ def log_message(self, f, *args): """Do not log any messages.""" pass - self.httpd = httpd = BaseHTTPServer.HTTPServer( + self.httpd = httpd = six.moves.BaseHTTPServer.HTTPServer( ('localhost', status_http_port), StatusHttpHandler) logging.info('Status HTTP server running at %s:%s', httpd.server_name, httpd.server_port) @@ -157,10 +157,10 @@ def _get_worker_count(pipeline_options): an int containing the worker_threads to use. Default is 1 """ pipeline_options = pipeline_options.get( - 'options') if pipeline_options.has_key('options') else {} + 'options') if 'options' in pipeline_options else {} experiments = pipeline_options.get( 'experiments' - ) if pipeline_options and pipeline_options.has_key('experiments') else [] + ) if pipeline_options and 'experiments' in pipeline_options else [] experiments = experiments if experiments else [] diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py index cc405e0e4771..824af8223b3d 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs.py @@ -19,10 +19,10 @@ import collections import logging -import Queue import threading import traceback +import six.moves.queue as queue from apache_beam.coders import observable from apache_beam.io import iobase from apache_beam.options.value_provider import RuntimeValueProvider @@ -61,13 +61,13 @@ def __init__(self, self.num_reader_threads = min(max_reader_threads, len(self.sources)) # Queue for sources that are to be read. - self.sources_queue = Queue.Queue() + self.sources_queue = queue.Queue() for source in sources: self.sources_queue.put(source) # Queue for elements that have been read. - self.element_queue = Queue.Queue(ELEMENT_QUEUE_SIZE) + self.element_queue = queue.Queue(ELEMENT_QUEUE_SIZE) # Queue for exceptions encountered in reader threads; to be rethrown. - self.reader_exceptions = Queue.Queue() + self.reader_exceptions = queue.Queue() # Whether we have already iterated; this iterable can only be used once. self.already_iterated = False # Whether an error was encountered in any source reader. @@ -137,7 +137,7 @@ def _reader_thread(self): self.element_queue.put(value) else: self.element_queue.put(_globally_windowed(value)) - except Queue.Empty: + except queue.Empty: return except Exception as e: # pylint: disable=broad-except logging.error('Encountered exception in PrefetchingSourceSetIterable ' diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py index 212dc19fde9b..5d6aa8aecf55 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py @@ -77,7 +77,7 @@ def test_single_source_iterator_fn(self): ] iterator_fn = sideinputs.get_iterator_fn_for_sources( sources, max_reader_threads=2) - assert list(strip_windows(iterator_fn())) == range(6) + assert list(strip_windows(iterator_fn())) == list(range(6)) def test_bytes_read_behind_experiment(self): mock_read_counter = mock.MagicMock() @@ -115,7 +115,7 @@ def test_multiple_sources_iterator_fn(self): ] iterator_fn = sideinputs.get_iterator_fn_for_sources( sources, max_reader_threads=3) - assert sorted(strip_windows(iterator_fn())) == range(11) + assert sorted(strip_windows(iterator_fn())) == list(range(11)) def test_multiple_sources_single_reader_iterator_fn(self): sources = [ @@ -126,7 +126,7 @@ def test_multiple_sources_single_reader_iterator_fn(self): ] iterator_fn = sideinputs.get_iterator_fn_for_sources( sources, max_reader_threads=1) - assert list(strip_windows(iterator_fn())) == range(11) + assert list(strip_windows(iterator_fn())) == list(range(11)) def test_source_iterator_single_source_exception(self): class MyException(Exception): @@ -172,7 +172,7 @@ def perpetual_generator(value): with self.assertRaises(MyException): for value in iterator_fn(): seen.add(value.value) - self.assertEqual(sorted(seen), range(5)) + self.assertEqual(sorted(seen), list(range(5))) class EmulatedCollectionsTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py index 8a63e7bd0561..a62595520ad3 100644 --- a/sdks/python/apache_beam/testing/test_stream.py +++ b/sdks/python/apache_beam/testing/test_stream.py @@ -20,9 +20,13 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + from abc import ABCMeta from abc import abstractmethod +import six + from apache_beam import coders from apache_beam import core from apache_beam import pvalue @@ -41,11 +45,9 @@ ] -class Event(object): +class Event(six.with_metaclass(ABCMeta, object)): """Test stream event to be emitted during execution of a TestStream.""" - __metaclass__ = ABCMeta - def __cmp__(self, other): if type(self) is not type(other): return cmp(type(self), type(other)) diff --git a/sdks/python/apache_beam/testing/test_utils_test.py b/sdks/python/apache_beam/testing/test_utils_test.py index 0018c0ed1541..20060adec63f 100644 --- a/sdks/python/apache_beam/testing/test_utils_test.py +++ b/sdks/python/apache_beam/testing/test_utils_test.py @@ -50,7 +50,7 @@ def test_delete_files_fails_with_io_error(self): utils.delete_files([path]) self.assertTrue( error.exception.args[0].startswith('Delete operation failed')) - self.assertEqual(error.exception.exception_details.keys(), [path]) + self.assertEqual(list(error.exception.exception_details.keys()), [path]) def test_delete_files_fails_with_invalid_arg(self): with self.assertRaises(RuntimeError): diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py index c08d250e1165..fd95482221c9 100644 --- a/sdks/python/apache_beam/testing/util.py +++ b/sdks/python/apache_beam/testing/util.py @@ -157,6 +157,6 @@ def open_shards(glob_pattern): """Returns a composite file of all shards matching the given glob pattern.""" with tempfile.NamedTemporaryFile(delete=False) as f: for shard in glob.glob(glob_pattern): - f.write(file(shard).read()) + f.write(open(shard).read()) concatenated_file_name = f.name - return file(concatenated_file_name, 'rb') + return open(concatenated_file_name, 'rb') diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index b6f19c6c03eb..b5ac7adc7382 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -87,7 +87,7 @@ def add_input(self, sum_count, element): return sum_ + element, count + 1 def merge_accumulators(self, accumulators): - sums, counts = zip(*accumulators) + sums, counts = list(zip(*accumulators)) return sum(sums), sum(counts) def extract_output(self, sum_count): diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 705106e784c5..0e94a98dd726 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -135,7 +135,9 @@ def test_combine_fn(combine_fn, shards, expected): final_accumulator = combine_fn.merge_accumulators(accumulators) self.assertEqual(combine_fn.extract_output(final_accumulator), expected) - test_combine_fn(combine.TopCombineFn(3), [range(10), range(10)], [9, 9, 8]) + test_combine_fn(combine.TopCombineFn(3), + [range(10), range(10)], + [9, 9, 8]) test_combine_fn(combine.TopCombineFn(5), [range(1000), range(100), range(1001)], [1000, 999, 999, 998, 998]) @@ -284,7 +286,7 @@ def match(actual): def matcher(): def match(actual): equal_to([1])([len(actual)]) - equal_to(pairs)(actual[0].iteritems()) + equal_to(pairs)(actual[0].items()) return match assert_that(result, matcher()) pipeline.run() diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 2d411ee75331..2d5658525612 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -23,6 +23,8 @@ import inspect import types +import six + from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints @@ -845,7 +847,8 @@ def with_outputs(self, *tags, **main_kw): """ main_tag = main_kw.pop('main', None) if main_kw: - raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys()) + raise ValueError('Unexpected keyword arguments: %s' + % list(main_kw.keys())) return _MultiParDo(self, tags, main_tag) def _pardo_fn_data(self): @@ -1676,7 +1679,7 @@ def __init__(self, **kwargs): super(Flatten, self).__init__() self.pipeline = kwargs.pop('pipeline', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs.keys())) def _extract_input_pvalues(self, pvalueish): try: @@ -1721,7 +1724,7 @@ def __init__(self, value): value: An object of values for the PCollection """ super(Create, self).__init__() - if isinstance(value, basestring): + if isinstance(value, six.string_types): raise TypeError('PTransform Create: Refusing to treat string as ' 'an iterable. (string=%r)' % value) elif isinstance(value, dict): @@ -1751,7 +1754,7 @@ def get_windowing(self, unused_inputs): @staticmethod def _create_source_from_iterable(values, coder): - return Create._create_source(map(coder.encode, values), coder) + return Create._create_source(list(map(coder.encode, values)), coder) @staticmethod def _create_source(serialized_values, coder): diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index cb7b53eb29aa..4206f2110b7d 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -44,6 +44,8 @@ from datetime import datetime from datetime import timedelta +import six + __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] @@ -167,7 +169,7 @@ class DisplayDataItem(object): display item belongs to. """ typeDict = {str:'STRING', - unicode:'STRING', + six.text_type:'STRING', int:'INTEGER', float:'FLOAT', bool: 'BOOLEAN', diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 5c73cf39a92f..90bde8caa8c4 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -22,7 +22,9 @@ import unittest from datetime import datetime +# pylint: disable=ungrouped-imports import hamcrest as hc +import six from hamcrest.core.base_matcher import BaseMatcher import apache_beam as beam @@ -31,6 +33,8 @@ from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.display import HasDisplayData +# pylint: enable=ungrouped-imports + class DisplayDataItemMatcher(BaseMatcher): """ Matcher class for DisplayDataItems in unit tests. @@ -161,7 +165,7 @@ def test_create_list_display_data(self): def test_unicode_type_display_data(self): class MyDoFn(beam.DoFn): def display_data(self): - return {'unicode_string': unicode('my string'), + return {'unicode_string': six.text_type('my string'), 'unicode_literal_string': u'my literal string'} fn = MyDoFn() diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index c7fc641804dc..9a2abd2e4781 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -611,7 +611,7 @@ def __init__(self, fn, *args, **kwargs): super(PTransformWithSideInputs, self).__init__() if (any([isinstance(v, pvalue.PCollection) for v in args]) or - any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])): + any([isinstance(v, pvalue.PCollection) for v in kwargs.values()])): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 1d5883481b6f..59305e16233f 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -17,6 +17,8 @@ """Unit tests for side inputs.""" +from __future__ import absolute_import + import logging import unittest @@ -194,7 +196,7 @@ def match(actual): [[actual_elem, actual_list, actual_dict]] = actual equal_to([expected_elem])([actual_elem]) equal_to(expected_list)(actual_list) - equal_to(expected_pairs)(actual_dict.iteritems()) + equal_to(expected_pairs)(actual_dict.items()) return match assert_that(results, matcher(1, a_list, some_pairs)) @@ -284,8 +286,8 @@ def matcher(expected_elem, expected_kvs): def match(actual): [[actual_elem, actual_dict1, actual_dict2]] = actual equal_to([expected_elem])([actual_elem]) - equal_to(expected_kvs)(actual_dict1.iteritems()) - equal_to(expected_kvs)(actual_dict2.iteritems()) + equal_to(expected_kvs)(actual_dict1.items()) + equal_to(expected_kvs)(actual_dict2.items()) return match assert_that(results, matcher(1, some_kvs)) diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py index 8d63d49baad0..851b9f427145 100644 --- a/sdks/python/apache_beam/transforms/timeutil.py +++ b/sdks/python/apache_beam/transforms/timeutil.py @@ -22,6 +22,8 @@ from abc import ABCMeta from abc import abstractmethod +import six + __all__ = [ 'TimeDomain', ] @@ -43,11 +45,9 @@ def from_string(domain): raise ValueError('Unknown time domain: %s' % domain) -class TimestampCombinerImpl(object): +class TimestampCombinerImpl(six.with_metaclass(ABCMeta, object)): """Implementation of TimestampCombiner.""" - __metaclass__ = ABCMeta - @abstractmethod def assign_output_time(self, window, input_timestamp): pass @@ -72,11 +72,9 @@ def merge(self, unused_result_window, merging_timestamps): return self.combine_all(merging_timestamps) -class DependsOnlyOnWindow(TimestampCombinerImpl): +class DependsOnlyOnWindow(six.with_metaclass(ABCMeta, TimestampCombinerImpl)): """TimestampCombinerImpl that only depends on the window.""" - __metaclass__ = ABCMeta - def combine(self, output_timestamp, other_output_timestamp): return output_timestamp diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 7d03240f9416..5ca6aa933daa 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -20,6 +20,8 @@ Triggers control when in processing time windows get emitted. """ +from __future__ import absolute_import + import collections import copy import itertools @@ -28,6 +30,8 @@ from abc import ABCMeta from abc import abstractmethod +import six + from apache_beam.coders import observable from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.transforms import combiners @@ -67,14 +71,13 @@ class AccumulationMode(object): # RETRACTING = 3 -class _StateTag(object): +class _StateTag(six.with_metaclass(ABCMeta, object)): """An identifier used to store and retrieve typed, combinable state. The given tag must be unique for this stage. If CombineFn is None then all elements will be returned as a list, otherwise the given CombineFn will be applied (possibly incrementally and eagerly) when adding elements. """ - __metaclass__ = ABCMeta def __init__(self, tag): self.tag = tag @@ -135,12 +138,11 @@ def with_prefix(self, prefix): # pylint: disable=unused-argument # TODO(robertwb): Provisional API, Java likely to change as well. -class TriggerFn(object): +class TriggerFn(six.with_metaclass(ABCMeta, object)): """A TriggerFn determines when window (panes) are emitted. See https://beam.apache.org/documentation/programming-guide/#triggers """ - __metaclass__ = ABCMeta @abstractmethod def on_element(self, element, window, context): @@ -511,9 +513,7 @@ def to_runner_api(self, context): subtrigger=self.underlying.to_runner_api(context))) -class _ParallelTriggerFn(TriggerFn): - - __metaclass__ = ABCMeta +class _ParallelTriggerFn(six.with_metaclass(ABCMeta, TriggerFn)): def __init__(self, *triggers): self.triggers = triggers @@ -740,14 +740,12 @@ def clear_state(self, tag): # pylint: disable=unused-argument -class SimpleState(object): +class SimpleState(six.with_metaclass(ABCMeta, object)): """Basic state storage interface used for triggering. Only timers must hold the watermark (by their timestamp). """ - __metaclass__ = ABCMeta - @abstractmethod def set_timer(self, window, name, time_domain, timestamp): pass @@ -859,7 +857,7 @@ def merge(self, to_be_merged, merge_result): self._persist_window_ids() def known_windows(self): - return self.window_ids.keys() + return list(self.window_ids.keys()) def get_window(self, window_id): for window, ids in self.window_ids.items(): @@ -913,11 +911,9 @@ def create_trigger_driver(windowing, return driver -class TriggerDriver(object): +class TriggerDriver(six.with_metaclass(ABCMeta, object)): """Breaks a series of bundle and timer firings into window (pane)s.""" - __metaclass__ = ABCMeta - @abstractmethod def process_elements(self, state, windowed_values, output_watermark): pass @@ -963,7 +959,7 @@ def __eq__(self, other): if isinstance(other, collections.Iterable): return all( a == b - for a, b in itertools.izip_longest(self, other, fillvalue=object())) + for a, b in itertools.zip_longest(self, other, fillvalue=object())) else: return NotImplemented @@ -1042,7 +1038,7 @@ def process_elements(self, state, windowed_values, output_watermark): # First handle merging. if self.is_merging: old_windows = set(state.known_windows()) - all_windows = old_windows.union(windows_to_elements.keys()) + all_windows = old_windows.union(list(windows_to_elements.keys())) if all_windows != old_windows: merged_away = {} @@ -1243,7 +1239,7 @@ def get_and_clear_timers(self, watermark=MAX_TIMESTAMP): def get_earliest_hold(self): earliest_hold = MAX_TIMESTAMP - for unused_window, tagged_states in self.state.iteritems(): + for unused_window, tagged_states in six.iteritems(self.state): # TODO(BEAM-2519): currently, this assumes that the watermark hold tag is # named "watermark". This is currently only true because the only place # watermark holds are set is in the GeneralTriggerDriver, where we use diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index d66736f6218f..06c0ebcb39d3 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -381,7 +381,7 @@ def test_picklable_output(self): pickle.dumps(unpicklable) for unwindowed in driver.process_elements(None, unpicklable, None): self.assertEqual(pickle.loads(pickle.dumps(unwindowed)).value, - range(10)) + list(range(10))) class RunnerApiTest(unittest.TestCase): @@ -425,7 +425,7 @@ def format_result(k_v): # A-10, A-11 never emitted due to AfterCount(3) never firing. 'B-4': {6, 7, 8, 9}, 'B-3': {10, 15, 16}, - }.iteritems())) + }.items())) class TranscriptTest(unittest.TestCase): @@ -554,7 +554,7 @@ def fire_timers(): for line in spec['transcript']: - action, params = line.items()[0] + action, params = list(line.items())[0] if action != 'expect': # Fail if we have output that was not expected in the transcript. diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 8185e64a67cf..103492a4312c 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -25,6 +25,8 @@ import random import time +import six + from apache_beam import typehints from apache_beam.metrics import Metrics from apache_beam.transforms import window @@ -109,12 +111,12 @@ def __init__(self, **kwargs): super(CoGroupByKey, self).__init__() self.pipeline = kwargs.pop('pipeline', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs.keys())) def _extract_input_pvalues(self, pvalueish): try: # If this works, it's a dict. - return pvalueish, tuple(pvalueish.viewvalues()) + return pvalueish, tuple(six.viewvalues(pvalueish)) except AttributeError: pcolls = tuple(pvalueish) return pcolls, pcolls @@ -149,7 +151,7 @@ def _merge_tagged_vals_under_key(key_grouped, result_ctor, # pairs. The result value constructor makes tuples with len(pcolls) slots. pcolls = list(enumerate(pcolls)) result_ctor_arg = len(pcolls) - result_ctor = lambda size: tuple([] for _ in xrange(size)) + result_ctor = lambda size: tuple([] for _ in range(size)) # Check input PCollections for PCollection-ness, and that they all belong # to the same pipeline. @@ -260,7 +262,7 @@ def _thin_data(self): odd_one_out = [sorted_data[-1]] if len(sorted_data) % 2 == 1 else [] # Sort the pairs by how different they are. pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]), - key=lambda ((x1, _1), (x2, _2)): x2 / x1) + key=lambda p1_p2: p1_p2[1][0] / p1_p2[0][0]) # Keep the top 1/3 most different pairs, average the top 2/3 most similar. threshold = 2 * len(pairs) / 3 self._data = ( diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index c250e8c6d365..2955f93367b8 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -51,6 +51,7 @@ import abc +import six from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 @@ -108,11 +109,9 @@ def get_impl(timestamp_combiner, window_fn): raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner) -class WindowFn(urns.RunnerApiFn): +class WindowFn(six.with_metaclass(abc.ABCMeta, urns.RunnerApiFn)): """An abstract windowing function defining a basic assign and merge.""" - __metaclass__ = abc.ABCMeta - class AssignContext(object): """Context passed to WindowFn.assign().""" diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index 7c7012c8aea7..99a24cf79b19 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -20,6 +20,8 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + import collections import inspect import sys @@ -205,7 +207,7 @@ def add_input(self, accumulator, element, *args, **kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within %s: ' '%s' % (self._label, e)) - raise TypeCheckError, error_msg, sys.exc_info()[2] + six.reraise(TypeCheckError, error_msg, sys.exc_info()[2]) return self._combinefn.add_input(accumulator, element, *args, **kwargs) def merge_accumulators(self, accumulators, *args, **kwargs): @@ -220,7 +222,7 @@ def extract_output(self, accumulator, *args, **kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within %s: ' '%s' % (self._label, e)) - raise TypeCheckError, error_msg, sys.exc_info()[2] + six.reraise(TypeCheckError, error_msg, sys.exc_info()[2]) return result diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index 3455672e7a82..7cad96b71afd 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -408,6 +408,9 @@ class AnyTypeConstraint(TypeConstraint): def __eq__(self, other): return type(self) == type(other) + def __hash__(self): + return hash(type(self)) + def __repr__(self): return 'Any' @@ -420,6 +423,10 @@ class TypeVariable(AnyTypeConstraint): def __eq__(self, other): return type(self) == type(other) and self.name == other.name + def __hash__(self): + # TODO(BEAM-3730): A proper hash causes combiners_test tests to fail + return id(self) + def __init__(self, name): self.name = name diff --git a/sdks/python/setup.py b/sdks/python/setup.py index a069237e22be..27d3ce0c9777 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -15,6 +15,8 @@ # limitations under the License. # +from __future__ import print_function + """Apache Beam SDK for Python setup file.""" from distutils.version import StrictVersion @@ -94,9 +96,11 @@ def get_version(): except ImportError: cythonize = lambda *args, **kwargs: [] +_AVRO = ('avro>=1.8.1,<2.0.0' if sys.version_info[0] == 2 + else 'avro-python3>=1.8.1,<2.0.0') REQUIRED_PACKAGES = [ - 'avro>=1.8.1,<2.0.0', + _AVRO, 'crcmod>=1.7,<2.0', 'dill==0.2.6', 'grpcio>=1.0,<2', @@ -159,7 +163,7 @@ def generate_common_urns(): src_time = os.path.getmtime(src) if os.path.exists(src) else -1 out_time = os.path.getmtime(out) if os.path.exists(out) else -1 if src_time > out_time: - print 'Regenerating common_urns module.' + print('Regenerating common_urns module.') urns = {} for m in re.finditer( r'\b(?:urn:)?beam:(\S+):(\S+):(v\S+)', open(src).read()):