From e35dc23ae5449185c16b4b171777b9325dd8deab Mon Sep 17 00:00:00 2001 From: cclauss Date: Tue, 6 Mar 2018 19:10:07 +0100 Subject: [PATCH] [BEAM-1251] Fix basestring, file(), reduce(), and xrange() for Python 3 --- .../apache_beam/examples/snippets/snippets_test.py | 2 +- sdks/python/apache_beam/io/filebasedsink.py | 10 ++++++---- sdks/python/apache_beam/io/filebasedsource.py | 5 +++-- sdks/python/apache_beam/io/filesystem.py | 3 ++- sdks/python/apache_beam/io/gcp/bigquery.py | 6 ++++-- sdks/python/apache_beam/io/vcfio.py | 4 +++- sdks/python/apache_beam/pvalue.py | 4 +++- sdks/python/apache_beam/runners/common.py | 4 ++-- sdks/python/apache_beam/testing/util.py | 9 +++++---- sdks/python/apache_beam/transforms/core.py | 4 +++- sdks/python/apache_beam/transforms/util.py | 2 +- 11 files changed, 33 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 19d77d9c4a97..349d52542da5 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -900,7 +900,7 @@ def test_combine_reduce(self): import functools import operator product = factors | beam.CombineGlobally( - functools.partial(reduce, operator.mul), 1) + functools.partial(functools.reduce, operator.mul), 1) # [END combine_reduce] self.assertEqual([210], product) diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py index 108165d06044..ab3ab5fd5b32 100644 --- a/sdks/python/apache_beam/io/filebasedsink.py +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -25,6 +25,8 @@ import time import uuid +from six import string_types + from apache_beam.internal import util from apache_beam.io import iobase from apache_beam.io.filesystem import BeamIOError @@ -73,10 +75,10 @@ def __init__(self, ~exceptions.ValueError: if **shard_name_template** is not of expected format. """ - if not isinstance(file_path_prefix, (basestring, ValueProvider)): + if not isinstance(file_path_prefix, (string_types, ValueProvider)): raise TypeError('file_path_prefix must be a string or ValueProvider;' 'got %r instead' % file_path_prefix) - if not isinstance(file_name_suffix, (basestring, ValueProvider)): + if not isinstance(file_name_suffix, (string_types, ValueProvider)): raise TypeError('file_name_suffix must be a string or ValueProvider;' 'got %r instead' % file_name_suffix) @@ -87,9 +89,9 @@ def __init__(self, shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE elif shard_name_template == '': num_shards = 1 - if isinstance(file_path_prefix, basestring): + if isinstance(file_path_prefix, string_types): file_path_prefix = StaticValueProvider(str, file_path_prefix) - if isinstance(file_name_suffix, basestring): + if isinstance(file_name_suffix, string_types): file_name_suffix = StaticValueProvider(str, file_name_suffix) self.file_path_prefix = file_path_prefix self.file_name_suffix = file_name_suffix diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index a80896c78181..134800c3fff5 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -27,6 +27,7 @@ """ from six import integer_types +from six import string_types from apache_beam.internal import pickler from apache_beam.io import concat_source @@ -98,12 +99,12 @@ def __init__(self, result. """ - if not isinstance(file_pattern, (basestring, ValueProvider)): + if not isinstance(file_pattern, (string_types, ValueProvider)): raise TypeError('%s: file_pattern must be of type string' ' or ValueProvider; got %r instead' % (self.__class__.__name__, file_pattern)) - if isinstance(file_pattern, basestring): + if isinstance(file_pattern, string_types): file_pattern = StaticValueProvider(str, file_pattern) self._pattern = file_pattern diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 5b053df53013..28a0c434dc53 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -27,6 +27,7 @@ import zlib from six import integer_types +from six import string_types from apache_beam.utils.plugin import BeamPlugin @@ -373,7 +374,7 @@ class FileMetadata(object): """Metadata about a file path that is the output of FileSystem.match """ def __init__(self, path, size_in_bytes): - assert isinstance(path, basestring) and path, "Path should be a string" + assert isinstance(path, string_types) and path, "Path should be a string" assert isinstance(size_in_bytes, integer_types) and size_in_bytes >= 0, \ "Invalid value for size_in_bytes should %s (of type %s)" % ( size_in_bytes, type(size_in_bytes)) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index a79ad5e39859..78955af79a11 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -120,6 +120,8 @@ import time import uuid +from six import string_types + from apache_beam import coders from apache_beam.internal.gcp import auth from apache_beam.internal.gcp.json_value import from_json_value @@ -522,7 +524,7 @@ def __init__(self, table, dataset=None, project=None, schema=None, self.table_reference = _parse_table_reference(table, dataset, project) # Transform the table schema into a bigquery.TableSchema instance. - if isinstance(schema, basestring): + if isinstance(schema, string_types): # TODO(silviuc): Should add a regex-based validation of the format. table_schema = bigquery.TableSchema() schema_list = [s.strip(' ') for s in schema.split(',')] @@ -1413,7 +1415,7 @@ def get_dict_table_schema(schema): return schema elif schema is None: return schema - elif isinstance(schema, basestring): + elif isinstance(schema, string_types): table_schema = WriteToBigQuery.get_table_schema_from_string(schema) return WriteToBigQuery.table_schema_to_dict(table_schema) elif isinstance(schema, bigquery.TableSchema): diff --git a/sdks/python/apache_beam/io/vcfio.py b/sdks/python/apache_beam/io/vcfio.py index a0206d450762..94c740b99640 100644 --- a/sdks/python/apache_beam/io/vcfio.py +++ b/sdks/python/apache_beam/io/vcfio.py @@ -26,6 +26,8 @@ import traceback from collections import namedtuple +from six import string_types + import vcf from apache_beam.coders import coders @@ -404,7 +406,7 @@ def _convert_to_variant_record(self, record, infos, formats): # Note: this is already done for INFO fields in PyVCF. if (field in formats and formats[field].num is None and - isinstance(data, (int, float, long, basestring, bool))): + isinstance(data, (int, float, long, string_types, bool))): data = [data] call.info[field] = data variant.calls.append(call) diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 236c14bcbc60..462b4eab4591 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -29,6 +29,8 @@ import collections import itertools +from six import string_types + from apache_beam import coders from apache_beam import typehints from apache_beam.internal import pickler @@ -259,7 +261,7 @@ class TaggedOutput(object): """ def __init__(self, tag, value): - if not isinstance(tag, basestring): + if not isinstance(tag, string_types): raise TypeError( 'Attempting to create a TaggedOutput with non-string tag %s' % tag) self.tag = tag diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 124d7d3da852..0bf5bac88b5d 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -571,7 +571,7 @@ def process_outputs(self, windowed_input_element, results): tag = None if isinstance(result, TaggedOutput): tag = result.tag - if not isinstance(tag, basestring): + if not isinstance(tag, six.string_types): raise TypeError('In %s, tag %s is not a string' % (self, tag)) result = result.value if isinstance(result, WindowedValue): @@ -613,7 +613,7 @@ def finish_bundle_outputs(self, results): tag = None if isinstance(result, TaggedOutput): tag = result.tag - if not isinstance(tag, basestring): + if not isinstance(tag, six.string_types): raise TypeError('In %s, tag %s is not a string' % (self, tag)) result = result.value diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py index c08d250e1165..e442425505cc 100644 --- a/sdks/python/apache_beam/testing/util.py +++ b/sdks/python/apache_beam/testing/util.py @@ -155,8 +155,9 @@ def default_label(self): @experimental() def open_shards(glob_pattern): """Returns a composite file of all shards matching the given glob pattern.""" - with tempfile.NamedTemporaryFile(delete=False) as f: + with tempfile.NamedTemporaryFile(delete=False) as out_file: for shard in glob.glob(glob_pattern): - f.write(file(shard).read()) - concatenated_file_name = f.name - return file(concatenated_file_name, 'rb') + with open(shard) as in_file: + out_file.write(in_file.read()) + concatenated_file_name = out_file.name + return open(concatenated_file_name, 'rb') diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 2d411ee75331..eefbc85f5427 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -23,6 +23,8 @@ import inspect import types +from six import string_types + from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints @@ -1721,7 +1723,7 @@ def __init__(self, value): value: An object of values for the PCollection """ super(Create, self).__init__() - if isinstance(value, basestring): + if isinstance(value, string_types): raise TypeError('PTransform Create: Refusing to treat string as ' 'an iterable. (string=%r)' % value) elif isinstance(value, dict): diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 7bd5d45fa4b5..2be94332cf3c 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -149,7 +149,7 @@ def _merge_tagged_vals_under_key(key_grouped, result_ctor, # pairs. The result value constructor makes tuples with len(pcolls) slots. pcolls = list(enumerate(pcolls)) result_ctor_arg = len(pcolls) - result_ctor = lambda size: tuple([] for _ in xrange(size)) + result_ctor = lambda size: tuple([] for _ in range(size)) # Check input PCollections for PCollection-ness, and that they all belong # to the same pipeline.