From 8bf17d4b7939ec4ffe40d393d516e7db2d362083 Mon Sep 17 00:00:00 2001 From: Matthias Feys Date: Wed, 20 Jun 2018 23:00:56 +0200 Subject: [PATCH 1/2] Futurize io subpackage --- sdks/python/apache_beam/io/__init__.py | 2 + sdks/python/apache_beam/io/avroio.py | 11 +++-- sdks/python/apache_beam/io/avroio_test.py | 2 + sdks/python/apache_beam/io/concat_source.py | 3 ++ .../apache_beam/io/concat_source_test.py | 22 +++++----- sdks/python/apache_beam/io/filebasedsink.py | 21 ++++++---- .../apache_beam/io/filebasedsink_test.py | 3 ++ sdks/python/apache_beam/io/filebasedsource.py | 18 ++++++--- .../apache_beam/io/filebasedsource_test.py | 32 +++++++++------ sdks/python/apache_beam/io/filesystem.py | 33 ++++++++++----- sdks/python/apache_beam/io/filesystem_test.py | 40 ++++++++++++++----- sdks/python/apache_beam/io/filesystemio.py | 13 +++--- .../apache_beam/io/filesystemio_test.py | 3 ++ sdks/python/apache_beam/io/filesystems.py | 13 ++++-- .../python/apache_beam/io/filesystems_test.py | 12 ++++-- sdks/python/apache_beam/io/gcp/__init__.py | 1 + sdks/python/apache_beam/io/gcp/bigquery.py | 17 +++++--- .../apache_beam/io/gcp/bigquery_test.py | 4 +- .../apache_beam/io/gcp/datastore/__init__.py | 1 + .../io/gcp/datastore/v1/__init__.py | 1 + .../io/gcp/datastore/v1/adaptive_throttler.py | 4 ++ .../datastore/v1/adaptive_throttler_test.py | 4 ++ .../io/gcp/datastore/v1/datastoreio.py | 9 +++-- .../io/gcp/datastore/v1/datastoreio_test.py | 10 ++++- .../io/gcp/datastore/v1/fake_datastore.py | 3 ++ .../apache_beam/io/gcp/datastore/v1/helper.py | 20 +++++++--- .../io/gcp/datastore/v1/helper_test.py | 7 +++- .../io/gcp/datastore/v1/query_splitter.py | 4 ++ .../gcp/datastore/v1/query_splitter_test.py | 2 + .../apache_beam/io/gcp/datastore/v1/util.py | 5 +++ .../apache_beam/io/gcp/gcsfilesystem.py | 6 ++- .../apache_beam/io/gcp/gcsfilesystem_test.py | 1 + sdks/python/apache_beam/io/gcp/gcsio.py | 11 ++++- sdks/python/apache_beam/io/gcp/gcsio_test.py | 7 +++- .../apache_beam/io/gcp/internal/__init__.py | 1 + .../io/gcp/internal/clients/__init__.py | 1 + .../gcp/internal/clients/bigquery/__init__.py | 2 + .../clients/bigquery/bigquery_v2_client.py | 5 ++- .../clients/bigquery/bigquery_v2_messages.py | 4 +- .../gcp/internal/clients/storage/__init__.py | 2 + .../clients/storage/storage_v1_client.py | 6 ++- .../clients/storage/storage_v1_messages.py | 5 ++- sdks/python/apache_beam/io/gcp/pubsub.py | 9 +++-- sdks/python/apache_beam/io/gcp/pubsub_test.py | 5 +++ .../apache_beam/io/gcp/tests/__init__.py | 2 + .../io/gcp/tests/bigquery_matcher.py | 2 + .../io/gcp/tests/bigquery_matcher_test.py | 2 + .../io/gcp/tests/pubsub_matcher.py | 2 + sdks/python/apache_beam/io/gcp/tests/utils.py | 2 + .../apache_beam/io/gcp/tests/utils_test.py | 2 + .../python/apache_beam/io/hadoopfilesystem.py | 1 + .../apache_beam/io/hadoopfilesystem_test.py | 5 ++- sdks/python/apache_beam/io/iobase.py | 4 ++ sdks/python/apache_beam/io/localfilesystem.py | 1 + .../apache_beam/io/localfilesystem_test.py | 12 ++++-- sdks/python/apache_beam/io/range_trackers.py | 15 ++++--- .../apache_beam/io/range_trackers_test.py | 13 ++++-- .../apache_beam/io/restriction_trackers.py | 5 ++- .../io/restriction_trackers_test.py | 2 + .../apache_beam/io/source_test_utils.py | 5 +++ .../apache_beam/io/source_test_utils_test.py | 3 ++ sdks/python/apache_beam/io/sources_test.py | 1 + sdks/python/apache_beam/io/textio.py | 2 + sdks/python/apache_beam/io/textio_test.py | 3 ++ sdks/python/apache_beam/io/tfrecordio.py | 1 + sdks/python/apache_beam/io/tfrecordio_test.py | 26 +++++++----- sdks/python/apache_beam/io/utils.py | 4 ++ sdks/python/apache_beam/io/vcfio.py | 15 +++++-- sdks/python/apache_beam/io/vcfio_test.py | 9 +++-- 69 files changed, 386 insertions(+), 138 deletions(-) diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py index 6ea0efdf65f7..4cbb4458864b 100644 --- a/sdks/python/apache_beam/io/__init__.py +++ b/sdks/python/apache_beam/io/__init__.py @@ -18,6 +18,8 @@ """A package defining several input sources and output sinks.""" # pylint: disable=wildcard-import +from __future__ import absolute_import + from apache_beam.io.avroio import * from apache_beam.io.filebasedsink import * from apache_beam.io.iobase import Read diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 1368734f17d3..a7bd9f44e9d6 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -41,10 +41,12 @@ that can be used to write a given ``PCollection`` of Python objects to an Avro file. """ +from __future__ import absolute_import -import cStringIO +import io import os import zlib +from builtins import object from functools import partial import avro @@ -53,6 +55,7 @@ from avro import schema from fastavro.read import block_reader from fastavro.write import Writer +from future import standard_library import apache_beam as beam from apache_beam.io import filebasedsink @@ -62,6 +65,8 @@ from apache_beam.io.iobase import Read from apache_beam.transforms import PTransform +standard_library.install_aliases() + __all__ = ['ReadFromAvro', 'ReadAllFromAvro', 'WriteToAvro'] @@ -341,7 +346,7 @@ def _decompress_bytes(data, codec): # We take care to avoid extra copies of data while slicing large objects # by use of a buffer. result = snappy.decompress(buffer(data)[:-4]) - avroio.BinaryDecoder(cStringIO.StringIO(data[-4:])).check_crc32(result) + avroio.BinaryDecoder(io.BytesIO(data[-4:])).check_crc32(result) return result else: raise ValueError('Unknown codec: %r' % codec) @@ -351,7 +356,7 @@ def num_records(self): def records(self): decoder = avroio.BinaryDecoder( - cStringIO.StringIO(self._decompressed_block_bytes)) + io.BytesIO(self._decompressed_block_bytes)) reader = avroio.DatumReader( writers_schema=self._schema, readers_schema=self._schema) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 9b9a855fcdfe..93f2ba9ebfd3 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -14,12 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import import json import logging import os import tempfile import unittest +from builtins import range import avro.datafile import avro.schema diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py index 56c4ccabcf71..ddf3a77745ee 100644 --- a/sdks/python/apache_beam/io/concat_source.py +++ b/sdks/python/apache_beam/io/concat_source.py @@ -19,9 +19,12 @@ Concat Source, which reads the union of several other sources. """ +from __future__ import absolute_import +from __future__ import division import bisect import threading +from builtins import range from apache_beam.io import iobase diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py index 0f7dd547e76e..31e4392f6454 100644 --- a/sdks/python/apache_beam/io/concat_source_test.py +++ b/sdks/python/apache_beam/io/concat_source_test.py @@ -16,9 +16,12 @@ # """Unit tests for the sources framework.""" +from __future__ import absolute_import +from __future__ import division import logging import unittest +from builtins import range import apache_beam as beam from apache_beam.io import iobase @@ -91,10 +94,10 @@ def test_conact_source(self): RangeSource(12, 16), ]) self.assertEqual(list(source.read(source.get_range_tracker())), - range(16)) + list(range(16))) self.assertEqual(list(source.read(source.get_range_tracker((1, None), (2, 10)))), - range(4, 10)) + list(range(4, 10))) range_tracker = source.get_range_tracker(None, None) self.assertEqual(range_tracker.position_at_fraction(0), (0, 0)) self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8)) @@ -176,10 +179,11 @@ def test_single_source(self): read_all = source_test_utils.read_from_source range10 = RangeSource(0, 10) - self.assertEquals(read_all(ConcatSource([range10])), range(10)) - self.assertEquals(read_all(ConcatSource([range10]), (0, 5)), range(5, 10)) + self.assertEquals(read_all(ConcatSource([range10])), list(range(10))) + self.assertEquals(read_all(ConcatSource([range10]), (0, 5)), + list(range(5, 10))) self.assertEquals(read_all(ConcatSource([range10]), None, (0, 5)), - range(5)) + list(range(5))) def test_source_with_empty_ranges(self): read_all = source_test_utils.read_from_source @@ -189,11 +193,11 @@ def test_source_with_empty_ranges(self): range10 = RangeSource(0, 10) self.assertEquals(read_all(ConcatSource([empty, empty, range10])), - range(10)) + list(range(10))) self.assertEquals(read_all(ConcatSource([empty, range10, empty])), - range(10)) + list(range(10))) self.assertEquals(read_all(ConcatSource([range10, empty, range10, empty])), - range(10) + range(10)) + list(range(10)) + list(range(10))) def test_source_with_empty_ranges_exhastive(self): empty = RangeSource(0, 0) @@ -214,7 +218,7 @@ def test_run_concat_direct(self): ]) pipeline = TestPipeline() pcoll = pipeline | beam.io.Read(source) - assert_that(pcoll, equal_to(range(1000))) + assert_that(pcoll, equal_to(list(range(1000)))) pipeline.run() diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py index 4c587b965f05..c4a746f5289d 100644 --- a/sdks/python/apache_beam/io/filebasedsink.py +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -24,8 +24,10 @@ import re import time import uuid +from builtins import range +from builtins import zip -from six import string_types +from future.utils import iteritems from apache_beam.internal import util from apache_beam.io import iobase @@ -41,6 +43,11 @@ __all__ = ['FileBasedSink'] +try: + unicode # pylint: disable=unicode-builtin +except NameError: + unicode = str + class FileBasedSink(iobase.Sink): """A sink to a GCS or local files. @@ -75,10 +82,10 @@ def __init__(self, ~exceptions.ValueError: if **shard_name_template** is not of expected format. """ - if not isinstance(file_path_prefix, (string_types, ValueProvider)): + if not isinstance(file_path_prefix, ((str, unicode), ValueProvider)): raise TypeError('file_path_prefix must be a string or ValueProvider;' 'got %r instead' % file_path_prefix) - if not isinstance(file_name_suffix, (string_types, ValueProvider)): + if not isinstance(file_name_suffix, ((str, unicode), ValueProvider)): raise TypeError('file_name_suffix must be a string or ValueProvider;' 'got %r instead' % file_name_suffix) @@ -89,9 +96,9 @@ def __init__(self, shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE elif shard_name_template == '': num_shards = 1 - if isinstance(file_path_prefix, string_types): + if isinstance(file_path_prefix, (str, unicode)): file_path_prefix = StaticValueProvider(str, file_path_prefix) - if isinstance(file_name_suffix, string_types): + if isinstance(file_name_suffix, (str, unicode)): file_name_suffix = StaticValueProvider(str, file_name_suffix) self.file_path_prefix = file_path_prefix self.file_name_suffix = file_name_suffix @@ -297,7 +304,7 @@ def _rename_batch(batch): except BeamIOError as exp: if exp.exception_details is None: raise - for (src, dst), exception in exp.exception_details.iteritems(): + for (src, dst), exception in iteritems(exp.exception_details): if exception: logging.error(('Exception in _rename_batch. src: %s, ' 'dst: %s, err: %s'), src, dst, exception) @@ -307,7 +314,7 @@ def _rename_batch(batch): return exceptions exception_batches = util.run_using_threadpool( - _rename_batch, zip(source_file_batch, destination_file_batch), + _rename_batch, list(zip(source_file_batch, destination_file_batch)), num_threads) all_exceptions = [e for exception_batch in exception_batches diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py index 05ac5228992a..b79370eff4ad 100644 --- a/sdks/python/apache_beam/io/filebasedsink_test.py +++ b/sdks/python/apache_beam/io/filebasedsink_test.py @@ -18,12 +18,15 @@ """Unit tests for file sinks.""" +from __future__ import absolute_import + import glob import logging import os import shutil import tempfile import unittest +from builtins import range import hamcrest as hc import mock diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 4509a3616b59..f06387ee42c6 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -26,8 +26,7 @@ :class:`~apache_beam.io._AvroSource`. """ -from six import integer_types -from six import string_types +from __future__ import absolute_import from apache_beam.internal import pickler from apache_beam.io import concat_source @@ -49,6 +48,13 @@ __all__ = ['FileBasedSource'] +try: + unicode # pylint: disable=unicode-builtin + long # pylint: disable=long-builtin +except NameError: + unicode = str + long = int + class FileBasedSource(iobase.BoundedSource): """A :class:`~apache_beam.io.iobase.BoundedSource` for reading a file glob of @@ -99,12 +105,12 @@ def __init__(self, result. """ - if not isinstance(file_pattern, (string_types, ValueProvider)): + if not isinstance(file_pattern, ((str, unicode), ValueProvider)): raise TypeError('%s: file_pattern must be of type string' ' or ValueProvider; got %r instead' % (self.__class__.__name__, file_pattern)) - if isinstance(file_pattern, string_types): + if isinstance(file_pattern, (str, unicode)): file_pattern = StaticValueProvider(str, file_pattern) self._pattern = file_pattern @@ -235,11 +241,11 @@ class _SingleFileSource(iobase.BoundedSource): def __init__(self, file_based_source, file_name, start_offset, stop_offset, min_bundle_size=0, splittable=True): - if not isinstance(start_offset, integer_types): + if not isinstance(start_offset, (int, long)): raise TypeError( 'start_offset must be a number. Received: %r' % start_offset) if stop_offset != range_trackers.OffsetRangeTracker.OFFSET_INFINITY: - if not isinstance(stop_offset, integer_types): + if not isinstance(stop_offset, (int, long)): raise TypeError( 'stop_offset must be a number. Received: %r' % stop_offset) if start_offset >= stop_offset: diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index c567b24e77a0..a8ad82253108 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -15,17 +14,22 @@ # limitations under the License. # +from __future__ import absolute_import + import bz2 -import cStringIO import gzip +import io import logging import math import os import random import tempfile import unittest +from builtins import object +from builtins import range import hamcrest as hc +from future import standard_library import apache_beam as beam from apache_beam.io import filebasedsource @@ -44,6 +48,8 @@ from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher +standard_library.install_aliases() + class LineSource(FileBasedSource): @@ -153,7 +159,7 @@ def __init__(self, values): def split(self, desired_bundle_size, start_position=None, stop_position=None): # simply devides values into two bundles - middle = len(self._values) / 2 + middle = len(self._values) // 2 yield iobase.SourceBundle(0.5, TestConcatSource.DummySource( self._values[:middle]), None, None) yield iobase.SourceBundle(0.5, TestConcatSource.DummySource( @@ -188,11 +194,11 @@ def test_read(self): concat = ConcatSource(sources) range_tracker = concat.get_range_tracker(None, None) read_data = [value for value in concat.read(range_tracker)] - self.assertItemsEqual(range(30), read_data) + self.assertItemsEqual(list(range(30)), read_data) def test_split(self): - sources = [TestConcatSource.DummySource(range(start, start + 10)) for start - in [0, 10, 20]] + sources = [TestConcatSource.DummySource(list(range(start, start + 10))) + for start in [0, 10, 20]] concat = ConcatSource(sources) splits = [split for split in concat.split()] self.assertEquals(6, len(splits)) @@ -205,7 +211,7 @@ def test_split(self): split.stop_position) read_data.extend([value for value in split.source.read( range_tracker_for_split)]) - self.assertItemsEqual(range(30), read_data) + self.assertItemsEqual(list(range(30)), read_data) def test_estimate_size(self): sources = [TestConcatSource.DummySource(range(start, start + 10)) for start @@ -473,8 +479,8 @@ def test_read_pattern_gzip(self): chunks = [lines[splits[i-1]:splits[i]] for i in range(1, len(splits))] compressed_chunks = [] for c in chunks: - out = cStringIO.StringIO() - with gzip.GzipFile(fileobj=out, mode="w") as f: + out = io.BytesIO() + with gzip.GzipFile(fileobj=out, mode="wb") as f: f.write('\n'.join(c)) compressed_chunks.append(out.getvalue()) file_pattern = write_prepared_pattern(compressed_chunks) @@ -520,8 +526,8 @@ def test_read_auto_pattern(self): chunks = [lines[splits[i - 1]:splits[i]] for i in range(1, len(splits))] compressed_chunks = [] for c in chunks: - out = cStringIO.StringIO() - with gzip.GzipFile(fileobj=out, mode="w") as f: + out = io.BytesIO() + with gzip.GzipFile(fileobj=out, mode="wb") as f: f.write('\n'.join(c)) compressed_chunks.append(out.getvalue()) file_pattern = write_prepared_pattern( @@ -540,8 +546,8 @@ def test_read_auto_pattern_compressed_and_uncompressed(self): chunks_to_write = [] for i, c in enumerate(chunks): if i%2 == 0: - out = cStringIO.StringIO() - with gzip.GzipFile(fileobj=out, mode="w") as f: + out = io.BytesIO() + with gzip.GzipFile(fileobj=out, mode="wb") as f: f.write('\n'.join(c)) chunks_to_write.append(out.getvalue()) else: diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 0b99793dca00..96799b110a9f 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -22,10 +22,10 @@ """ from __future__ import absolute_import +from __future__ import division import abc import bz2 -import cStringIO import fnmatch import logging import os @@ -33,12 +33,17 @@ import re import time import zlib +from builtins import object +from builtins import zip +from io import BytesIO -from six import integer_types -from six import string_types +from future import standard_library +from future.utils import with_metaclass from apache_beam.utils.plugin import BeamPlugin +standard_library.install_aliases() + logger = logging.getLogger(__name__) DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 @@ -46,6 +51,13 @@ __all__ = ['CompressionTypes', 'CompressedFile', 'FileMetadata', 'FileSystem', 'MatchResult'] +try: + unicode # pylint: disable=unicode-builtin + long # pylint: disable=long-builtin +except NameError: + unicode = str + long = int + class CompressionTypes(object): """Enum-like class representing known compression types.""" @@ -91,7 +103,7 @@ def detect_compression_type(cls, file_path): """Returns the compression type of a file (based on its suffix).""" compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP} lowercased_path = file_path.lower() - for suffix, compression_type in compression_types_by_suffix.iteritems(): + for suffix, compression_type in compression_types_by_suffix.items(): if lowercased_path.endswith(suffix): return compression_type return cls.UNCOMPRESSED @@ -131,7 +143,7 @@ def __init__(self, if self.readable(): self._read_size = read_size - self._read_buffer = cStringIO.StringIO() + self._read_buffer = BytesIO() self._read_position = 0 self._read_eof = False @@ -246,13 +258,13 @@ def readline(self): if not self._decompressor: raise ValueError('decompressor not initialized') - io = cStringIO.StringIO() + io = BytesIO() while True: # Ensure that the internal buffer has at least half the read_size. Going # with half the _read_size (as opposed to a full _read_size) to ensure # that actual fetches are more evenly spread out, as opposed to having 2 # consecutive reads at the beginning of a read. - self._fetch_to_internal_buffer(self._read_size / 2) + self._fetch_to_internal_buffer(self._read_size // 2) line = self._read_from_internal_buffer( lambda: self._read_buffer.readline()) io.write(line) @@ -382,8 +394,8 @@ class FileMetadata(object): """Metadata about a file path that is the output of FileSystem.match """ def __init__(self, path, size_in_bytes): - assert isinstance(path, string_types) and path, "Path should be a string" - assert isinstance(size_in_bytes, integer_types) and size_in_bytes >= 0, \ + assert isinstance(path, (str, unicode)) and path, "Path should be a string" + assert isinstance(size_in_bytes, (int, long)) and size_in_bytes >= 0, \ "Invalid value for size_in_bytes should %s (of type %s)" % ( size_in_bytes, type(size_in_bytes)) self.path = path @@ -432,14 +444,13 @@ def __init__(self, msg, exception_details=None): self.exception_details = exception_details -class FileSystem(BeamPlugin): +class FileSystem(with_metaclass(abc.ABCMeta, BeamPlugin)): """A class that defines the functions that can be performed on a filesystem. All methods are abstract and they are for file system providers to implement. Clients should use the FileSystems class to interact with the correct file system based on the provided file pattern scheme. """ - __metaclass__ = abc.ABCMeta CHUNK_SIZE = 1 # Chuck size in the batch operations def __init__(self, pipeline_options): diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py index f50f25e79e54..8c9ffde4feea 100644 --- a/sdks/python/apache_beam/io/filesystem_test.py +++ b/sdks/python/apache_beam/io/filesystem_test.py @@ -17,19 +17,28 @@ # """Unit tests for filesystem module.""" +from __future__ import absolute_import +from __future__ import division + import bz2 import gzip import logging import os import tempfile import unittest -from StringIO import StringIO +from builtins import range +from io import BytesIO + +from future import standard_library +from future.utils import iteritems from apache_beam.io.filesystem import CompressedFile from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem +standard_library.install_aliases() + class TestingFileSystem(FileSystem): @@ -59,7 +68,7 @@ def _insert_random_file(self, path, size): self._files[path] = size def _list(self, dir_or_prefix): - for path, size in self._files.iteritems(): + for path, size in iteritems(self._files): if path.startswith(dir_or_prefix): yield FileMetadata(path, size) @@ -254,14 +263,15 @@ def test_seek_set(self): with open(file_name, 'rb') as f: compressed_fd = CompressedFile(f, compression_type, read_size=self.read_block_size) - reference_fd = StringIO(self.content) + reference_fd = BytesIO(self.content) - # Note: content (readline) check must come before position (tell) check - # because cStringIO's tell() reports out of bound positions (if we seek - # beyond the file) up until a real read occurs. + # Note: BytesIO's tell() reports out of bound positions (if we seek + # beyond the file), therefore we need to cap it to max_position # _CompressedFile.tell() always stays within the bounds of the # uncompressed content. - for seek_position in (-1, 0, 1, + # Negative seek position argument is not supported for BytesIO with + # whence set to SEEK_SET. + for seek_position in (0, 1, len(self.content)-1, len(self.content), len(self.content) + 1): compressed_fd.seek(seek_position, os.SEEK_SET) @@ -273,6 +283,8 @@ def test_seek_set(self): uncompressed_position = compressed_fd.tell() reference_position = reference_fd.tell() + max_position = len(self.content) + reference_position = min(reference_position, max_position) self.assertEqual(uncompressed_position, reference_position) def test_seek_cur(self): @@ -281,13 +293,16 @@ def test_seek_cur(self): with open(file_name, 'rb') as f: compressed_fd = CompressedFile(f, compression_type, read_size=self.read_block_size) - reference_fd = StringIO(self.content) + reference_fd = BytesIO(self.content) # Test out of bound, inbound seeking in both directions + # Note: BytesIO's seek() reports out of bound positions (if we seek + # beyond the file), therefore we need to cap it to max_position (to + # make it consistent with the old StringIO behavior for seek_position in (-1, 0, 1, - len(self.content) / 2, - len(self.content) / 2, - -1 * len(self.content) / 2): + len(self.content) // 2, + len(self.content) // 2, + -1 * len(self.content) // 2): compressed_fd.seek(seek_position, os.SEEK_CUR) reference_fd.seek(seek_position, os.SEEK_CUR) @@ -297,6 +312,9 @@ def test_seek_cur(self): reference_position = reference_fd.tell() uncompressed_position = compressed_fd.tell() + max_position = len(self.content) + reference_position = min(reference_position, max_position) + reference_fd.seek(reference_position, os.SEEK_SET) self.assertEqual(uncompressed_position, reference_position) def test_read_from_end_returns_no_data(self): diff --git a/sdks/python/apache_beam/io/filesystemio.py b/sdks/python/apache_beam/io/filesystemio.py index 35e141bb7566..086ae164aef6 100644 --- a/sdks/python/apache_beam/io/filesystemio.py +++ b/sdks/python/apache_beam/io/filesystemio.py @@ -16,22 +16,25 @@ # """Utilities for ``FileSystem`` implementations.""" +from __future__ import absolute_import + import abc import io import os +from builtins import object + +from future.utils import with_metaclass __all__ = ['Downloader', 'Uploader', 'DownloaderStream', 'UploaderStream', 'PipeStream'] -class Downloader(object): +class Downloader(with_metaclass(abc.ABCMeta, object)): """Download interface for a single file. Implementations should support random access reads. """ - __metaclass__ = abc.ABCMeta - @abc.abstractproperty def size(self): """Size of file to download.""" @@ -52,11 +55,9 @@ def get_range(self, start, end): """ -class Uploader(object): +class Uploader(with_metaclass(abc.ABCMeta, object)): """Upload interface for a single file.""" - __metaclass__ = abc.ABCMeta - @abc.abstractmethod def put(self, data): """Write data to file sequentially. diff --git a/sdks/python/apache_beam/io/filesystemio_test.py b/sdks/python/apache_beam/io/filesystemio_test.py index 41f238361044..75079a539c44 100644 --- a/sdks/python/apache_beam/io/filesystemio_test.py +++ b/sdks/python/apache_beam/io/filesystemio_test.py @@ -16,12 +16,15 @@ # """Tests for filesystemio.""" +from __future__ import absolute_import + import io import logging import multiprocessing import os import threading import unittest +from builtins import range from apache_beam.io import filesystemio diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index 66eff061fb06..55ad5d1e33ed 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -17,9 +17,10 @@ """FileSystems interface class for accessing the correct filesystem""" -import re +from __future__ import absolute_import -from six import string_types +import re +from builtins import object from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystem import CompressionTypes @@ -46,6 +47,12 @@ from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem except ImportError: pass + +try: + unicode # pylint: disable=unicode-builtin +except NameError: + unicode = str + # pylint: enable=wrong-import-position, unused-import __all__ = ['FileSystems'] @@ -273,7 +280,7 @@ def delete(paths): Raises: ``BeamIOError`` if any of the delete operations fail """ - if isinstance(paths, string_types): + if isinstance(paths, (str, unicode)): raise BeamIOError('Delete passed string argument instead of list: %s' % paths) if len(paths) == 0: diff --git a/sdks/python/apache_beam/io/filesystems_test.py b/sdks/python/apache_beam/io/filesystems_test.py index c084a3cf7bb4..be735ec7351f 100644 --- a/sdks/python/apache_beam/io/filesystems_test.py +++ b/sdks/python/apache_beam/io/filesystems_test.py @@ -18,6 +18,8 @@ """Unit tests for LocalFileSystem.""" +from __future__ import absolute_import + import filecmp import logging import os @@ -124,7 +126,7 @@ def test_match_file_exception(self): with self.assertRaisesRegexp(BeamIOError, r'^Unable to get the Filesystem') as error: FileSystems.match([None]) - self.assertEqual(error.exception.exception_details.keys(), [None]) + self.assertEqual(list(error.exception.exception_details.keys()), [None]) def test_match_directory(self): path1 = os.path.join(self.tmpdir, 'f1') @@ -158,7 +160,8 @@ def test_copy_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Copy operation failed') as error: FileSystems.copy([path1], [path2]) - self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + self.assertEqual(list(error.exception.exception_details.keys()), + [(path1, path2)]) def test_copy_directory(self): path_t1 = os.path.join(self.tmpdir, 't1') @@ -190,7 +193,8 @@ def test_rename_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Rename operation failed') as error: FileSystems.rename([path1], [path2]) - self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + self.assertEqual(list(error.exception.exception_details.keys()), + [(path1, path2)]) def test_rename_directory(self): path_t1 = os.path.join(self.tmpdir, 't1') @@ -231,7 +235,7 @@ def test_delete_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Delete operation failed') as error: FileSystems.delete([path1]) - self.assertEqual(error.exception.exception_details.keys(), [path1]) + self.assertEqual(list(error.exception.exception_details.keys()), [path1]) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/gcp/__init__.py b/sdks/python/apache_beam/io/gcp/__init__.py index cce3acad34a4..f4f43cbb1236 100644 --- a/sdks/python/apache_beam/io/gcp/__init__.py +++ b/sdks/python/apache_beam/io/gcp/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 41d0833217bb..be5f3cb6cdac 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -119,8 +119,11 @@ import re import time import uuid +from builtins import object +from builtins import zip -from six import string_types +from future.utils import iteritems +from future.utils import itervalues from apache_beam import coders from apache_beam.internal.gcp import auth @@ -143,6 +146,10 @@ pass # pylint: enable=wrong-import-order, wrong-import-position +try: + unicode # pylint: disable=unicode-builtin +except NameError: + unicode = str __all__ = [ 'TableRowJsonCoder', @@ -212,7 +219,7 @@ def decode(self, encoded_table_row): od = json.loads( encoded_table_row, object_pairs_hook=collections.OrderedDict) return bigquery.TableRow( - f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()]) + f=[bigquery.TableCell(v=to_json_value(e)) for e in itervalues(od)]) def parse_table_schema_from_json(schema_string): @@ -524,7 +531,7 @@ def __init__(self, table, dataset=None, project=None, schema=None, self.table_reference = _parse_table_reference(table, dataset, project) # Transform the table schema into a bigquery.TableSchema instance. - if isinstance(schema, string_types): + if isinstance(schema, (str, unicode)): # TODO(silviuc): Should add a regex-based validation of the format. table_schema = bigquery.TableSchema() schema_list = [s.strip(' ') for s in schema.split(',')] @@ -1103,7 +1110,7 @@ def insert_rows(self, project_id, dataset_id, table_id, rows): final_rows = [] for row in rows: json_object = bigquery.JsonObject() - for k, v in row.iteritems(): + for k, v in iteritems(row): json_object.additionalProperties.append( bigquery.JsonObject.AdditionalProperty( key=k, value=to_json_value(v))) @@ -1415,7 +1422,7 @@ def get_dict_table_schema(schema): return schema elif schema is None: return schema - elif isinstance(schema, string_types): + elif isinstance(schema, (str, unicode)): table_schema = WriteToBigQuery.get_table_schema_from_string(schema) return WriteToBigQuery.table_schema_to_dict(table_schema) elif isinstance(schema, bigquery.TableSchema): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index ff6721e6d91a..843fc394ff14 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -16,6 +16,7 @@ # """Unit tests for BigQuery sources and sinks.""" +from __future__ import absolute_import import datetime import json @@ -26,6 +27,7 @@ import hamcrest as hc import mock +from future.utils import iteritems import apache_beam as beam from apache_beam.internal.gcp.json_value import to_json_value @@ -698,7 +700,7 @@ def test_rows_are_written(self): sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14} expected_rows = [] json_object = bigquery.JsonObject() - for k, v in sample_row.iteritems(): + for k, v in iteritems(sample_row): json_object.additionalProperties.append( bigquery.JsonObject.AdditionalProperty( key=k, value=to_json_value(v))) diff --git a/sdks/python/apache_beam/io/gcp/datastore/__init__.py b/sdks/python/apache_beam/io/gcp/datastore/__init__.py index cce3acad34a4..f4f43cbb1236 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/__init__.py +++ b/sdks/python/apache_beam/io/gcp/datastore/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/__init__.py b/sdks/python/apache_beam/io/gcp/datastore/v1/__init__.py index cce3acad34a4..f4f43cbb1236 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/__init__.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py index 7d94f24ca859..f6c65a5fa1cf 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py @@ -19,7 +19,11 @@ # # For internal use only; no backwards-compatibility guarantees. +from __future__ import absolute_import +from __future__ import division + import random +from builtins import object from apache_beam.io.gcp.datastore.v1 import util diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py index 1ac23930f65f..e3ccb9211886 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py @@ -15,7 +15,11 @@ # limitations under the License. # +from __future__ import absolute_import +from __future__ import division + import unittest +from builtins import range from mock import patch diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py index 13209c17bd29..437f38850bac 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py @@ -16,9 +16,12 @@ # """A connector for reading from and writing to Google Cloud Datastore""" +from __future__ import absolute_import +from __future__ import division import logging import time +from builtins import object from apache_beam.io.gcp.datastore.v1 import helper from apache_beam.io.gcp.datastore.v1 import query_splitter @@ -362,11 +365,11 @@ def get_batch_size(self, now): return _Mutate._WRITE_BATCH_INITIAL_SIZE recent_mean_latency_ms = (self._commit_time_per_entity_ms.sum(now) - / self._commit_time_per_entity_ms.count(now)) + // self._commit_time_per_entity_ms.count(now)) return max(_Mutate._WRITE_BATCH_MIN_SIZE, min(_Mutate._WRITE_BATCH_MAX_SIZE, _Mutate._WRITE_BATCH_TARGET_LATENCY_MS - / max(recent_mean_latency_ms, 1) + // max(recent_mean_latency_ms, 1) )) def report_latency(self, now, latency_ms, num_mutations): @@ -444,7 +447,7 @@ def _flush_batch(self): _, latency_ms = helper.write_mutations( self._datastore, self._project, self._mutations, self._throttler, self._update_rpc_stats, - throttle_delay=_Mutate._WRITE_BATCH_TARGET_LATENCY_MS/1000) + throttle_delay=_Mutate._WRITE_BATCH_TARGET_LATENCY_MS//1000) logging.debug("Successfully wrote %d mutations in %dms.", len(self._mutations), latency_ms) diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py index e131f93d5207..b7bc22a58709 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py @@ -15,9 +15,14 @@ # limitations under the License. # +from __future__ import absolute_import +from __future__ import division from __future__ import print_function import unittest +from builtins import map +from builtins import range +from builtins import zip from mock import MagicMock from mock import call @@ -179,7 +184,8 @@ def check_DatastoreWriteFn(self, num_entities): entities = [e.entity for e in fake_datastore.create_entities(num_entities)] - expected_mutations = map(WriteToDatastore.to_upsert_mutation, entities) + expected_mutations = list(map(WriteToDatastore.to_upsert_mutation, + entities)) actual_mutations = [] self._mock_datastore.commit.side_effect = ( @@ -195,7 +201,7 @@ def check_DatastoreWriteFn(self, num_entities): self.assertEqual(actual_mutations, expected_mutations) self.assertEqual( - (num_entities - 1) / _Mutate._WRITE_BATCH_INITIAL_SIZE + 1, + (num_entities - 1) // _Mutate._WRITE_BATCH_INITIAL_SIZE + 1, self._mock_datastore.commit.call_count) def test_DatastoreWriteLargeEntities(self): diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py index aa3780558d7f..054df9ddbcc3 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py @@ -20,7 +20,10 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + import uuid +from builtins import range # Protect against environments where datastore library is not available. # pylint: disable=wrong-import-order, wrong-import-position diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index 87d798bebe3f..40261c59caa3 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -24,10 +24,10 @@ import logging import sys import time +from builtins import next +from builtins import object from socket import error as SocketError -import six - # pylint: disable=ungrouped-imports from apache_beam.internal.gcp import auth from apache_beam.utils import retry @@ -49,6 +49,11 @@ # pylint: enable=ungrouped-imports +try: + unicode # pylint: disable=unicode-builtin +except NameError: + unicode = str + def key_comparator(k1, k2): """A comparator for Datastore keys. @@ -90,7 +95,7 @@ def compare_path(p1, p2): 3. If no `id` is defined for both paths, then their `names` are compared. """ - result = cmp(p1.kind, p2.kind) + result = (p1.kind > p2.kind) - (p1.kind < p2.kind) if result != 0: return result @@ -98,12 +103,12 @@ def compare_path(p1, p2): if not p2.HasField('id'): return -1 - return cmp(p1.id, p2.id) + return (p1.id > p2.id) - (p1.id < p2.id) if p2.HasField('id'): return 1 - return cmp(p1.name, p2.name) + return (p1.name > p2.name) - (p1.name < p2.name) def get_datastore(project): @@ -255,7 +260,7 @@ def make_kind_stats_query(namespace, kind, latest_timestamp): kind_filter = datastore_helper.set_property_filter( query_pb2.Filter(), 'kind_name', PropertyFilter.EQUAL, - six.text_type(kind)) + unicode(kind)) timestamp_filter = datastore_helper.set_property_filter( query_pb2.Filter(), 'timestamp', PropertyFilter.EQUAL, latest_timestamp) @@ -295,6 +300,9 @@ def _next_batch(self): resp = self._datastore.run_query(self._req) return resp + def __next__(self): + return next(self.__iter__()) + def __iter__(self): more_results = True while more_results: diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py index 90a366842da4..764dadabb35d 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py @@ -16,10 +16,13 @@ # """Tests for datastore helper.""" +from __future__ import absolute_import + import errno import random import sys import unittest +from builtins import map from socket import error as SocketError from mock import MagicMock @@ -82,7 +85,7 @@ def test_query_iterator(self): self.permanent_retriable_datastore_failure) query_iterator = helper.QueryIterator("project", None, self._query, self._mock_datastore) - self.assertRaises(RPCError, iter(query_iterator).next) + self.assertRaises(RPCError, query_iterator.__next__) self.assertEqual(6, len(self._mock_datastore.run_query.call_args_list)) def test_query_iterator_with_transient_failures(self): @@ -104,7 +107,7 @@ def test_query_iterator_with_non_retriable_failures(self): query_iterator = helper.QueryIterator("project", None, self._query, self._mock_datastore) self.assertRaises(tuple(map(type, self._non_retriable_errors)), - iter(query_iterator).next) + query_iterator.__next__) self.assertEqual(1, len(self._mock_datastore.run_query.call_args_list)) def test_query_iterator_with_single_batch(self): diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py index d5674f9cbf15..7723fb7b6465 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py @@ -16,6 +16,10 @@ # """Implements a Cloud Datastore query splitter.""" +from __future__ import absolute_import +from __future__ import division + +from builtins import range from apache_beam.io.gcp.datastore.v1 import helper diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py index 52f25facd058..c5bfdd81cf8b 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py @@ -17,6 +17,8 @@ """Cloud Datastore query splitter test.""" +from __future__ import absolute_import + import unittest from mock import MagicMock diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/util.py b/sdks/python/apache_beam/io/gcp/datastore/v1/util.py index 5670a241ba80..d796225e1853 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/util.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/util.py @@ -19,7 +19,12 @@ # # For internal use only; no backwards-compatibility guarantees. +from __future__ import absolute_import +from __future__ import division + import math +from builtins import object +from builtins import range class MovingSum(object): diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index b861446c87e6..a52df0ef9466 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -18,6 +18,10 @@ from __future__ import absolute_import +from builtins import zip + +from future.utils import iteritems + from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystem import CompressedFile from apache_beam.io.filesystem import CompressionTypes @@ -117,7 +121,7 @@ def _list(self, dir_or_prefix): ``BeamIOError`` if listing fails, but not if no files were found. """ try: - for path, size in gcsio.GcsIO().list_prefix(dir_or_prefix).iteritems(): + for path, size in iteritems(gcsio.GcsIO().list_prefix(dir_or_prefix)): yield FileMetadata(path, size) except Exception as e: # pylint: disable=broad-except raise BeamIOError("List operation failed", {dir_or_prefix: e}) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index f0d46f9620ba..88f7ce93f46a 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -20,6 +20,7 @@ import logging import unittest +from builtins import zip import mock diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 72d1de40153d..81723d9c1912 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -20,7 +20,8 @@ https://github.com/GoogleCloudPlatform/appengine-gcs-client. """ -import cStringIO +from __future__ import absolute_import + import errno import io import logging @@ -30,8 +31,10 @@ import threading import time import traceback +from builtins import object import httplib2 +from future import standard_library from apache_beam.io.filesystemio import Downloader from apache_beam.io.filesystemio import DownloaderStream @@ -40,6 +43,9 @@ from apache_beam.io.filesystemio import UploaderStream from apache_beam.utils import retry +standard_library.install_aliases() + + __all__ = ['GcsIO'] @@ -468,7 +474,7 @@ def __init__(self, client, path, buffer_size): self._get_request.generation = metadata.generation # Initialize read buffer state. - self._download_stream = cStringIO.StringIO() + self._download_stream = io.BytesIO() self._downloader = transfer.Download( self._download_stream, auto_transfer=False, chunksize=self._buffer_size) self._client.objects.Get(self._get_request, download=self._downloader) @@ -483,6 +489,7 @@ def size(self): return self._size def get_range(self, start, end): + self._download_stream.seek(0) self._download_stream.truncate(0) self._downloader.GetRange(start, end - 1) return self._download_stream.getvalue() diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index b2bb43e30e8e..b10926c41906 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -15,12 +15,15 @@ # limitations under the License. # """Tests for Google Cloud Storage client.""" +from __future__ import division import errno import logging import os import random import unittest +from builtins import object +from builtins import range import httplib2 import mock @@ -606,7 +609,7 @@ def test_context_manager(self): # Test that exceptions are not swallowed by the context manager. with self.assertRaises(ZeroDivisionError): with self.gcs.open(file_name) as f: - f.read(0 / 0) + f.read(0 // 0) def test_list_prefix(self): bucket_name = 'gcsio-test' @@ -637,7 +640,7 @@ def test_list_prefix(self): expected_file_names = [('gs://%s/%s' % (bucket_name, object_name), size) for (object_name, size) in expected_object_names] self.assertEqual( - set(self.gcs.list_prefix(file_pattern).iteritems()), + set(self.gcs.list_prefix(file_pattern).items()), set(expected_file_names)) diff --git a/sdks/python/apache_beam/io/gcp/internal/__init__.py b/sdks/python/apache_beam/io/gcp/internal/__init__.py index cce3acad34a4..f4f43cbb1236 100644 --- a/sdks/python/apache_beam/io/gcp/internal/__init__.py +++ b/sdks/python/apache_beam/io/gcp/internal/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/__init__.py b/sdks/python/apache_beam/io/gcp/internal/clients/__init__.py index cce3acad34a4..f4f43cbb1236 100644 --- a/sdks/python/apache_beam/io/gcp/internal/clients/__init__.py +++ b/sdks/python/apache_beam/io/gcp/internal/clients/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/__init__.py b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/__init__.py index 732c1c6bd5d4..e5d35e8f805b 100644 --- a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/__init__.py +++ b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/__init__.py @@ -18,6 +18,8 @@ """Common imports for generated bigquery client library.""" # pylint:disable=wildcard-import +from __future__ import absolute_import + import pkgutil # Protect against environments where apitools library is not available. diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py index 201a1830b878..9b8cddee03ff 100644 --- a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py +++ b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py @@ -17,9 +17,12 @@ """Generated client library for bigquery version v2.""" # NOTE: This file is autogenerated and should not be edited by hand. +from __future__ import absolute_import + from apitools.base.py import base_api -from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_messages as messages +from apache_beam.io.gcp.internal.clients.bigquery import \ + bigquery_v2_messages as messages class BigqueryV2(base_api.BaseApiClient): diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py index 3e741cdbbd63..d96582eaa61c 100644 --- a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py +++ b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py @@ -20,10 +20,10 @@ A data platform for customers to create, manage, share and query data. """ # NOTE: This file is autogenerated and should not be edited by hand. +from __future__ import absolute_import from apitools.base.protorpclite import messages as _messages -from apitools.base.py import encoding -from apitools.base.py import extra_types +from apitools.base.py import encoding, extra_types package = 'bigquery' diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py b/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py index 8a726ef85dac..c26332355b7e 100644 --- a/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py +++ b/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py @@ -18,6 +18,8 @@ """Common imports for generated storage client library.""" # pylint:disable=wildcard-import +from __future__ import absolute_import + import pkgutil # Protect against environments where apitools library is not available. diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py index 1b46d917f143..1335bd4fe104 100644 --- a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py +++ b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py @@ -17,9 +17,13 @@ """Generated client library for storage version v1.""" # NOTE: This file is autogenerated and should not be edited by hand. + +from __future__ import absolute_import + from apitools.base.py import base_api -from apache_beam.io.gcp.internal.clients.storage import storage_v1_messages as messages +from apache_beam.io.gcp.internal.clients.storage import \ + storage_v1_messages as messages class StorageV1(base_api.BaseApiClient): diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py index 3c180a652bc5..95a31c430da1 100644 --- a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py +++ b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py @@ -21,10 +21,11 @@ """ # NOTE: This file is autogenerated and should not be edited by hand. +from __future__ import absolute_import + from apitools.base.protorpclite import message_types as _message_types from apitools.base.protorpclite import messages as _messages -from apitools.base.py import encoding -from apitools.base.py import extra_types +from apitools.base.py import encoding, extra_types package = 'storage' diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 6db45bdbfa55..47e7904f0b8c 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -33,8 +33,7 @@ from __future__ import absolute_import import re - -from six import text_type +from builtins import object from apache_beam import coders from apache_beam.io.iobase import Read @@ -49,6 +48,10 @@ except ImportError: pubsub_pb2 = None +try: + basestring +except NameError: + basestring = str __all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub', 'WriteStringsToPubSub'] @@ -173,7 +176,7 @@ def expand(self, pvalue): p = (pvalue.pipeline | ReadFromPubSub(self.topic, self.subscription, self.id_label) | 'DecodeString' >> Map(lambda b: b.decode('utf-8'))) - p.element_type = text_type + p.element_type = basestring return p diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 165c072abb1b..01cb0c072efd 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -18,9 +18,14 @@ """Unit tests for PubSub sources and sinks.""" +from __future__ import absolute_import + import functools import logging import unittest +from builtins import object +from builtins import range +from builtins import zip import hamcrest as hc import mock diff --git a/sdks/python/apache_beam/io/gcp/tests/__init__.py b/sdks/python/apache_beam/io/gcp/tests/__init__.py index cce3acad34a4..6569e3fe5de4 100644 --- a/sdks/python/apache_beam/io/gcp/tests/__init__.py +++ b/sdks/python/apache_beam/io/gcp/tests/__init__.py @@ -14,3 +14,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py index 8241a228cf09..c33f0db49983 100644 --- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py +++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py @@ -17,6 +17,8 @@ """Bigquery data verifier for end-to-end test.""" +from __future__ import absolute_import + import logging from hamcrest.core.base_matcher import BaseMatcher diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py index a0977189e06b..e6ae9a06dc87 100644 --- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py @@ -17,6 +17,8 @@ """Unit test for Bigquery verifier""" +from __future__ import absolute_import + import logging import unittest diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py index 695bfcd70f69..da906c6c6fe0 100644 --- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py +++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py @@ -17,6 +17,8 @@ """PubSub verifier used for end-to-end test.""" +from __future__ import absolute_import + import logging import time from collections import Counter diff --git a/sdks/python/apache_beam/io/gcp/tests/utils.py b/sdks/python/apache_beam/io/gcp/tests/utils.py index b4b4ba8b11f0..81fc4736c046 100644 --- a/sdks/python/apache_beam/io/gcp/tests/utils.py +++ b/sdks/python/apache_beam/io/gcp/tests/utils.py @@ -18,6 +18,8 @@ """Utility methods for testing on GCP.""" +from __future__ import absolute_import + import logging from apache_beam.utils import retry diff --git a/sdks/python/apache_beam/io/gcp/tests/utils_test.py b/sdks/python/apache_beam/io/gcp/tests/utils_test.py index a5c74bb34248..4ea65a9d86be 100644 --- a/sdks/python/apache_beam/io/gcp/tests/utils_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/utils_test.py @@ -17,6 +17,8 @@ """Unittest for GCP testing utils.""" +from __future__ import absolute_import + import logging import unittest diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py index 1fdd0713dd17..61c16dc4c240 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem.py @@ -24,6 +24,7 @@ import logging import posixpath import re +from builtins import zip import hdfs diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py index d35b8d5b4e0e..cce0ac72a33b 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py @@ -23,6 +23,9 @@ import logging import posixpath import unittest +from builtins import object + +from future.utils import itervalues from apache_beam.io import hadoopfilesystem as hdfs from apache_beam.io.filesystem import BeamIOError @@ -125,7 +128,7 @@ def list(self, path, status=False): 'list must be called on a directory, got file: %s' % path) result = [] - for file in self.files.itervalues(): + for file in itervalues(self.files): if file.stat['path'].startswith(path): fs = file.get_file_status() result.append((fs[hdfs._FILE_STATUS_PATH_SUFFIX], fs)) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 7f9750b96099..34a50b54dbed 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -29,9 +29,13 @@ the sink. """ +from __future__ import absolute_import + import logging import random import uuid +from builtins import object +from builtins import range from collections import namedtuple from apache_beam import coders diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py index 7e7f88d4e455..23a1e8a846c0 100644 --- a/sdks/python/apache_beam/io/localfilesystem.py +++ b/sdks/python/apache_beam/io/localfilesystem.py @@ -20,6 +20,7 @@ import os import shutil +from builtins import zip from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystem import CompressedFile diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py index 5f825eec3795..d6d8eb4c2173 100644 --- a/sdks/python/apache_beam/io/localfilesystem_test.py +++ b/sdks/python/apache_beam/io/localfilesystem_test.py @@ -18,6 +18,8 @@ """Unit tests for LocalFileSystem.""" +from __future__ import absolute_import + import filecmp import logging import os @@ -146,7 +148,7 @@ def test_match_file_exception(self): with self.assertRaisesRegexp(BeamIOError, r'^Match operation failed') as error: self.fs.match([None]) - self.assertEqual(error.exception.exception_details.keys(), [None]) + self.assertEqual(list(error.exception.exception_details.keys()), [None]) def test_match_glob(self): path1 = os.path.join(self.tmpdir, 'f1') @@ -190,7 +192,8 @@ def test_copy_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Copy operation failed') as error: self.fs.copy([path1], [path2]) - self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + self.assertEqual(list(error.exception.exception_details.keys()), + [(path1, path2)]) def test_copy_directory(self): path_t1 = os.path.join(self.tmpdir, 't1') @@ -222,7 +225,8 @@ def test_rename_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Rename operation failed') as error: self.fs.rename([path1], [path2]) - self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + self.assertEqual(list(error.exception.exception_details.keys()), + [(path1, path2)]) def test_rename_directory(self): path_t1 = os.path.join(self.tmpdir, 't1') @@ -273,7 +277,7 @@ def test_delete_error(self): with self.assertRaisesRegexp(BeamIOError, r'^Delete operation failed') as error: self.fs.delete([path1]) - self.assertEqual(error.exception.exception_details.keys(), [path1]) + self.assertEqual(list(error.exception.exception_details.keys()), [path1]) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py index 2da8736b1141..2613b364b6f0 100644 --- a/sdks/python/apache_beam/io/range_trackers.py +++ b/sdks/python/apache_beam/io/range_trackers.py @@ -17,18 +17,23 @@ """iobase.RangeTracker implementations provided with Apache Beam. """ +from __future__ import absolute_import +from __future__ import division import logging import math import threading -from six import integer_types - from apache_beam.io import iobase __all__ = ['OffsetRangeTracker', 'LexicographicKeyRangeTracker', 'OrderedPositionRangeTracker', 'UnsplittableRangeTracker'] +try: + long # pylint: disable=long-builtin +except NameError: + long = int + class OffsetRangeTracker(iobase.RangeTracker): """A 'RangeTracker' for non-negative positions of type 'long'.""" @@ -47,9 +52,9 @@ def __init__(self, start, end): raise ValueError('Start offset must not be \'None\'') if end is None: raise ValueError('End offset must not be \'None\'') - assert isinstance(start, integer_types) + assert isinstance(start, (int, long)) if end != self.OFFSET_INFINITY: - assert isinstance(end, integer_types) + assert isinstance(end, (int, long)) assert start <= end @@ -123,7 +128,7 @@ def set_current_position(self, record_start): self._last_record_start = record_start def try_split(self, split_offset): - assert isinstance(split_offset, integer_types) + assert isinstance(split_offset, (int, long)) with self._lock: if self._stop_offset == OffsetRangeTracker.OFFSET_INFINITY: logging.debug('refusing to split %r at %d: stop position unspecified', diff --git a/sdks/python/apache_beam/io/range_trackers_test.py b/sdks/python/apache_beam/io/range_trackers_test.py index 6b8f7c7139c9..f0c7ca4ace85 100644 --- a/sdks/python/apache_beam/io/range_trackers_test.py +++ b/sdks/python/apache_beam/io/range_trackers_test.py @@ -16,16 +16,21 @@ # """Unit tests for the range_trackers module.""" +from __future__ import absolute_import +from __future__ import division import copy import logging import math import unittest -from six import integer_types - from apache_beam.io import range_trackers +try: + long # pylint: disable=long-builtin +except NameError: + long = int + class OffsetRangeTrackerTest(unittest.TestCase): @@ -102,7 +107,7 @@ def test_get_position_for_fraction_dense(self): # Position must be an integer type. self.assertTrue(isinstance(tracker.position_at_fraction(0.0), - integer_types)) + (int, long))) # [3, 3) represents 0.0 of [3, 6) self.assertEqual(3, tracker.position_at_fraction(0.0)) # [3, 4) represents up to 1/3 of [3, 6) @@ -163,7 +168,7 @@ def test_try_split_points(self): tracker = range_trackers.OffsetRangeTracker(100, 400) def dummy_callback(stop_position): - return int(stop_position / 5) + return int(stop_position // 5) tracker.set_split_points_unclaimed_callback(dummy_callback) diff --git a/sdks/python/apache_beam/io/restriction_trackers.py b/sdks/python/apache_beam/io/restriction_trackers.py index 8aeecba3ae1d..014125f29006 100644 --- a/sdks/python/apache_beam/io/restriction_trackers.py +++ b/sdks/python/apache_beam/io/restriction_trackers.py @@ -16,8 +16,11 @@ # """`iobase.RestrictionTracker` implementations provided with Apache Beam.""" +from __future__ import absolute_import +from __future__ import division import threading +from builtins import object from apache_beam.io.iobase import RestrictionTracker from apache_beam.io.range_trackers import OffsetRangeTracker @@ -48,7 +51,7 @@ def split(self, desired_num_offsets_per_split, min_num_offsets_per_split=1): remaining = self.stop - current_split_stop # Avoiding a small split at the end. - if (remaining < desired_num_offsets_per_split / 4 or + if (remaining < desired_num_offsets_per_split // 4 or remaining < min_num_offsets_per_split): current_split_stop = self.stop diff --git a/sdks/python/apache_beam/io/restriction_trackers_test.py b/sdks/python/apache_beam/io/restriction_trackers_test.py index e8a799f28f1f..2820426690f4 100644 --- a/sdks/python/apache_beam/io/restriction_trackers_test.py +++ b/sdks/python/apache_beam/io/restriction_trackers_test.py @@ -17,6 +17,8 @@ """Unit tests for the range_trackers module.""" +from __future__ import absolute_import + import logging import unittest diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py index 05e6e9c0d6f6..f60fafb913e2 100644 --- a/sdks/python/apache_beam/io/source_test_utils.py +++ b/sdks/python/apache_beam/io/source_test_utils.py @@ -43,10 +43,15 @@ * apache_beam.io.source_test_utils_test.py * apache_beam.io.avroio_test.py """ +from __future__ import absolute_import +from __future__ import division import logging import threading import weakref +from builtins import next +from builtins import object +from builtins import range from collections import namedtuple from multiprocessing.pool import ThreadPool diff --git a/sdks/python/apache_beam/io/source_test_utils_test.py b/sdks/python/apache_beam/io/source_test_utils_test.py index 9a619c4821f2..94eb4401f6b2 100644 --- a/sdks/python/apache_beam/io/source_test_utils_test.py +++ b/sdks/python/apache_beam/io/source_test_utils_test.py @@ -15,9 +15,12 @@ # limitations under the License. # +from __future__ import absolute_import + import logging import tempfile import unittest +from builtins import range import apache_beam.io.source_test_utils as source_test_utils from apache_beam.io.filebasedsource_test import LineSource diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py index 8f885e59fb0b..40629ae83958 100644 --- a/sdks/python/apache_beam/io/sources_test.py +++ b/sdks/python/apache_beam/io/sources_test.py @@ -16,6 +16,7 @@ # """Unit tests for the sources framework.""" +from __future__ import absolute_import import logging import os diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index f5fd2da6056c..fee174bb3a1e 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -21,6 +21,8 @@ from __future__ import absolute_import import logging +from builtins import object +from builtins import range from functools import partial from six import integer_types diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 324f52adf75a..3606897049d2 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -16,6 +16,8 @@ # """Tests for textio module.""" +from __future__ import absolute_import +from __future__ import division import bz2 import glob @@ -25,6 +27,7 @@ import shutil import tempfile import unittest +from builtins import range import apache_beam as beam import apache_beam.io.source_test_utils as source_test_utils diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py index 0290bfaf2702..989247a96ee0 100644 --- a/sdks/python/apache_beam/io/tfrecordio.py +++ b/sdks/python/apache_beam/io/tfrecordio.py @@ -20,6 +20,7 @@ import logging import struct +from builtins import object from functools import partial import crcmod diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py index c540cfae0502..ded8d7941596 100644 --- a/sdks/python/apache_beam/io/tfrecordio_test.py +++ b/sdks/python/apache_beam/io/tfrecordio_test.py @@ -15,18 +15,22 @@ # limitations under the License. # +from __future__ import absolute_import + import binascii -import cStringIO import glob import gzip +import io import logging import os import pickle import random import re import unittest +from builtins import range import crcmod +from future import standard_library import apache_beam as beam from apache_beam import Create @@ -42,6 +46,8 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +standard_library.install_aliases() + try: import tensorflow as tf # pylint: disable=import-error except ImportError: @@ -81,15 +87,15 @@ def setUp(self): self.record = binascii.a2b_base64(FOO_RECORD_BASE64) def _as_file_handle(self, contents): - result = cStringIO.StringIO() + result = io.BytesIO() result.write(contents) - result.reset() + result.seek(0) return result def _increment_value_at_index(self, value, index): l = list(value) - l[index] = chr(ord(l[index]) + 1) - return ''.join(l) + l[index] = bytes(ord(l[index]) + 1) + return "".join(l) def _test_error(self, record, error_text): with self.assertRaisesRegexp(ValueError, re.escape(error_text)): @@ -122,7 +128,7 @@ def test_masked_crc32c_crcmod(self): '\x03\x00\x00\x00\x00\x00\x00\x00', crc32c_fn=crc32c_fn)) def test_write_record(self): - file_handle = cStringIO.StringIO() + file_handle = io.BytesIO() _TFRecordUtil.write_record(file_handle, 'foo') self.assertEqual(self.record, file_handle.getvalue()) @@ -143,9 +149,9 @@ def test_read_record_invalid_data_mask(self): def test_compatibility_read_write(self): for record in ['', 'blah', 'another blah']: - file_handle = cStringIO.StringIO() + file_handle = io.BytesIO() _TFRecordUtil.write_record(file_handle, record) - file_handle.reset() + file_handle.seek(0) actual = _TFRecordUtil.read_record(file_handle) self.assertEqual(record, actual) @@ -391,7 +397,7 @@ class TestEnd2EndWriteAndRead(unittest.TestCase): def create_inputs(self): input_array = [[random.random() - 0.5 for _ in range(15)] for _ in range(12)] - memfile = cStringIO.StringIO() + memfile = io.BytesIO() pickle.dump(input_array, memfile) return memfile.getvalue() @@ -445,7 +451,7 @@ def test_end2end_example_proto(self): file_path_prefix = temp_dir.create_temp_file('result') example = tf.train.Example() - example.features.feature['int'].int64_list.value.extend(range(3)) + example.features.feature['int'].int64_list.value.extend(list(range(3))) example.features.feature['bytes'].bytes_list.value.extend( [b'foo', b'bar']) diff --git a/sdks/python/apache_beam/io/utils.py b/sdks/python/apache_beam/io/utils.py index d6b312d7a67f..1dfadb5b73c4 100644 --- a/sdks/python/apache_beam/io/utils.py +++ b/sdks/python/apache_beam/io/utils.py @@ -19,6 +19,10 @@ on transforms.ptransform_test.test_read_metrics. """ +from __future__ import absolute_import + +from builtins import range + from apache_beam.io import iobase from apache_beam.io.range_trackers import OffsetRangeTracker from apache_beam.metrics import Metrics diff --git a/sdks/python/apache_beam/io/vcfio.py b/sdks/python/apache_beam/io/vcfio.py index 94c740b99640..cd3b617e2c73 100644 --- a/sdks/python/apache_beam/io/vcfio.py +++ b/sdks/python/apache_beam/io/vcfio.py @@ -24,9 +24,11 @@ import logging import traceback +from builtins import next +from builtins import object from collections import namedtuple -from six import string_types +from future.utils import iteritems import vcf @@ -38,8 +40,10 @@ from apache_beam.transforms import PTransform try: - long # Python 2 + unicode # pylint: disable=unicode-builtin + int # Python 2 except NameError: + unicode = str long = int # Python 3 @@ -320,6 +324,9 @@ def __iter__(self): return self def next(self): + return self.__next__() + + def __next__(self): try: record = next(self._vcf_reader) return self._convert_to_variant_record(record, self._vcf_reader.infos, @@ -371,7 +378,7 @@ def _convert_to_variant_record(self, record, infos, formats): if record.FILTER is not None: variant.filters.extend( record.FILTER if record.FILTER else [PASS_FILTER]) - for k, v in record.INFO.iteritems(): + for k, v in iteritems(record.INFO): # Special case: END info value specifies end of the record, so adjust # variant.end and do not include it as part of variant.info. if k == END_INFO_KEY: @@ -406,7 +413,7 @@ def _convert_to_variant_record(self, record, infos, formats): # Note: this is already done for INFO fields in PyVCF. if (field in formats and formats[field].num is None and - isinstance(data, (int, float, long, string_types, bool))): + isinstance(data, (int, float, int, str, unicode, bool))): data = [data] call.info[field] = data variant.calls.append(call) diff --git a/sdks/python/apache_beam/io/vcfio_test.py b/sdks/python/apache_beam/io/vcfio_test.py index 029515fe3419..25b5d0cfa098 100644 --- a/sdks/python/apache_beam/io/vcfio_test.py +++ b/sdks/python/apache_beam/io/vcfio_test.py @@ -17,6 +17,8 @@ """Tests for vcfio module.""" +from __future__ import absolute_import + import logging import os import unittest @@ -74,9 +76,10 @@ def get_full_dir(): def _variant_comparator(v1, v2): if v1.reference_name == v2.reference_name: if v1.start == v2.start: - return cmp(v1.end, v2.end) - return cmp(v1.start, v2.start) - return cmp(v1.reference_name, v2.reference_name) + return (v1.end > v2.end) - (v1.end < v2.end) + return (v1.start > v2.start) - (v1.start < v2.start) + return (v1.reference_name > v2.reference_name) - \ + (v1.reference_name < v2.reference_name) # Helper method for verifying equal count on PCollection. From 071bc35d23874a70d777d79eb650076ae7143e13 Mon Sep 17 00:00:00 2001 From: Matthias Feys Date: Fri, 6 Jul 2018 11:08:31 +0200 Subject: [PATCH 2/2] incorporated all feedback for futurize io subpackage --- sdks/python/apache_beam/io/avroio.py | 3 --- sdks/python/apache_beam/io/filebasedsource.py | 10 +++------- sdks/python/apache_beam/io/filebasedsource_test.py | 3 --- sdks/python/apache_beam/io/filesystem.py | 10 +++++----- sdks/python/apache_beam/io/filesystems_test.py | 2 +- .../python/apache_beam/io/gcp/datastore/v1/helper.py | 12 +++--------- .../apache_beam/io/gcp/datastore/v1/helper_test.py | 4 ++-- sdks/python/apache_beam/io/gcp/gcsio.py | 4 ---- sdks/python/apache_beam/io/gcp/pubsub.py | 7 ++----- 9 files changed, 16 insertions(+), 39 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index a7bd9f44e9d6..9b86b58982b5 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -55,7 +55,6 @@ from avro import schema from fastavro.read import block_reader from fastavro.write import Writer -from future import standard_library import apache_beam as beam from apache_beam.io import filebasedsink @@ -65,8 +64,6 @@ from apache_beam.io.iobase import Read from apache_beam.transforms import PTransform -standard_library.install_aliases() - __all__ = ['ReadFromAvro', 'ReadAllFromAvro', 'WriteToAvro'] diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index f06387ee42c6..ec16b060c329 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -28,6 +28,9 @@ from __future__ import absolute_import +from past.builtins import long +from past.builtins import unicode + from apache_beam.internal import pickler from apache_beam.io import concat_source from apache_beam.io import iobase @@ -48,13 +51,6 @@ __all__ = ['FileBasedSource'] -try: - unicode # pylint: disable=unicode-builtin - long # pylint: disable=long-builtin -except NameError: - unicode = str - long = int - class FileBasedSource(iobase.BoundedSource): """A :class:`~apache_beam.io.iobase.BoundedSource` for reading a file glob of diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index a8ad82253108..e9312238f5b5 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -29,7 +29,6 @@ from builtins import range import hamcrest as hc -from future import standard_library import apache_beam as beam from apache_beam.io import filebasedsource @@ -48,8 +47,6 @@ from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher -standard_library.install_aliases() - class LineSource(FileBasedSource): diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 96799b110a9f..a9dafe6df703 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -27,6 +27,7 @@ import abc import bz2 import fnmatch +import io import logging import os import posixpath @@ -35,7 +36,6 @@ import zlib from builtins import object from builtins import zip -from io import BytesIO from future import standard_library from future.utils import with_metaclass @@ -143,7 +143,7 @@ def __init__(self, if self.readable(): self._read_size = read_size - self._read_buffer = BytesIO() + self._read_buffer = io.BytesIO() self._read_position = 0 self._read_eof = False @@ -258,7 +258,7 @@ def readline(self): if not self._decompressor: raise ValueError('decompressor not initialized') - io = BytesIO() + bytes_io = io.BytesIO() while True: # Ensure that the internal buffer has at least half the read_size. Going # with half the _read_size (as opposed to a full _read_size) to ensure @@ -267,11 +267,11 @@ def readline(self): self._fetch_to_internal_buffer(self._read_size // 2) line = self._read_from_internal_buffer( lambda: self._read_buffer.readline()) - io.write(line) + bytes_io.write(line) if line.endswith('\n') or not line: break # Newline or EOF reached. - return io.getvalue() + return bytes_io.getvalue() def closed(self): return not self._file or self._file.closed() diff --git a/sdks/python/apache_beam/io/filesystems_test.py b/sdks/python/apache_beam/io/filesystems_test.py index be735ec7351f..383eb40ea072 100644 --- a/sdks/python/apache_beam/io/filesystems_test.py +++ b/sdks/python/apache_beam/io/filesystems_test.py @@ -126,7 +126,7 @@ def test_match_file_exception(self): with self.assertRaisesRegexp(BeamIOError, r'^Unable to get the Filesystem') as error: FileSystems.match([None]) - self.assertEqual(list(error.exception.exception_details.keys()), [None]) + self.assertEqual(list(error.exception.exception_details), [None]) def test_match_directory(self): path1 = os.path.join(self.tmpdir, 'f1') diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index 40261c59caa3..a27df09ac0e0 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -24,10 +24,12 @@ import logging import sys import time -from builtins import next from builtins import object from socket import error as SocketError +from future.builtins import next +from past.builtins import unicode + # pylint: disable=ungrouped-imports from apache_beam.internal.gcp import auth from apache_beam.utils import retry @@ -49,11 +51,6 @@ # pylint: enable=ungrouped-imports -try: - unicode # pylint: disable=unicode-builtin -except NameError: - unicode = str - def key_comparator(k1, k2): """A comparator for Datastore keys. @@ -300,9 +297,6 @@ def _next_batch(self): resp = self._datastore.run_query(self._req) return resp - def __next__(self): - return next(self.__iter__()) - def __iter__(self): more_results = True while more_results: diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py index 764dadabb35d..3e3f51762f4c 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py @@ -85,7 +85,7 @@ def test_query_iterator(self): self.permanent_retriable_datastore_failure) query_iterator = helper.QueryIterator("project", None, self._query, self._mock_datastore) - self.assertRaises(RPCError, query_iterator.__next__) + self.assertRaises(RPCError, iter(query_iterator).next) self.assertEqual(6, len(self._mock_datastore.run_query.call_args_list)) def test_query_iterator_with_transient_failures(self): @@ -107,7 +107,7 @@ def test_query_iterator_with_non_retriable_failures(self): query_iterator = helper.QueryIterator("project", None, self._query, self._mock_datastore) self.assertRaises(tuple(map(type, self._non_retriable_errors)), - query_iterator.__next__) + iter(query_iterator).next) self.assertEqual(1, len(self._mock_datastore.run_query.call_args_list)) def test_query_iterator_with_single_batch(self): diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 81723d9c1912..a76199e865da 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -34,7 +34,6 @@ from builtins import object import httplib2 -from future import standard_library from apache_beam.io.filesystemio import Downloader from apache_beam.io.filesystemio import DownloaderStream @@ -43,9 +42,6 @@ from apache_beam.io.filesystemio import UploaderStream from apache_beam.utils import retry -standard_library.install_aliases() - - __all__ = ['GcsIO'] diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 47e7904f0b8c..a57bef8f395e 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -35,6 +35,8 @@ import re from builtins import object +from past.builtins import basestring + from apache_beam import coders from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write @@ -48,11 +50,6 @@ except ImportError: pubsub_pb2 = None -try: - basestring -except NameError: - basestring = str - __all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub', 'WriteStringsToPubSub']