Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/examples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ def test_combine_reduce(self):
import functools
import operator
product = factors | beam.CombineGlobally(
functools.partial(reduce, operator.mul), 1)
functools.partial(functools.reduce, operator.mul), 1)
# [END combine_reduce]
self.assertEqual([210], product)

Expand Down
10 changes: 6 additions & 4 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import time
import uuid

from six import string_types

from apache_beam.internal import util
from apache_beam.io import iobase
from apache_beam.io.filesystem import BeamIOError
Expand Down Expand Up @@ -73,10 +75,10 @@ def __init__(self,
~exceptions.ValueError: if **shard_name_template** is not of expected
format.
"""
if not isinstance(file_path_prefix, (basestring, ValueProvider)):
if not isinstance(file_path_prefix, (string_types, ValueProvider)):
raise TypeError('file_path_prefix must be a string or ValueProvider;'
'got %r instead' % file_path_prefix)
if not isinstance(file_name_suffix, (basestring, ValueProvider)):
if not isinstance(file_name_suffix, (string_types, ValueProvider)):
raise TypeError('file_name_suffix must be a string or ValueProvider;'
'got %r instead' % file_name_suffix)

Expand All @@ -87,9 +89,9 @@ def __init__(self,
shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
elif shard_name_template == '':
num_shards = 1
if isinstance(file_path_prefix, basestring):
if isinstance(file_path_prefix, string_types):
file_path_prefix = StaticValueProvider(str, file_path_prefix)
if isinstance(file_name_suffix, basestring):
if isinstance(file_name_suffix, string_types):
file_name_suffix = StaticValueProvider(str, file_name_suffix)
self.file_path_prefix = file_path_prefix
self.file_name_suffix = file_name_suffix
Expand Down
5 changes: 3 additions & 2 deletions sdks/python/apache_beam/io/filebasedsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"""

from six import integer_types
from six import string_types

from apache_beam.internal import pickler
from apache_beam.io import concat_source
Expand Down Expand Up @@ -98,12 +99,12 @@ def __init__(self,
result.
"""

if not isinstance(file_pattern, (basestring, ValueProvider)):
if not isinstance(file_pattern, (string_types, ValueProvider)):
raise TypeError('%s: file_pattern must be of type string'
' or ValueProvider; got %r instead'
% (self.__class__.__name__, file_pattern))

if isinstance(file_pattern, basestring):
if isinstance(file_pattern, string_types):
file_pattern = StaticValueProvider(str, file_pattern)
self._pattern = file_pattern

Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import zlib

from six import integer_types
from six import string_types

from apache_beam.utils.plugin import BeamPlugin

Expand Down Expand Up @@ -373,7 +374,7 @@ class FileMetadata(object):
"""Metadata about a file path that is the output of FileSystem.match
"""
def __init__(self, path, size_in_bytes):
assert isinstance(path, basestring) and path, "Path should be a string"
assert isinstance(path, string_types) and path, "Path should be a string"
assert isinstance(size_in_bytes, integer_types) and size_in_bytes >= 0, \
"Invalid value for size_in_bytes should %s (of type %s)" % (
size_in_bytes, type(size_in_bytes))
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@
import time
import uuid

from six import string_types

from apache_beam import coders
from apache_beam.internal.gcp import auth
from apache_beam.internal.gcp.json_value import from_json_value
Expand Down Expand Up @@ -522,7 +524,7 @@ def __init__(self, table, dataset=None, project=None, schema=None,

self.table_reference = _parse_table_reference(table, dataset, project)
# Transform the table schema into a bigquery.TableSchema instance.
if isinstance(schema, basestring):
if isinstance(schema, string_types):
# TODO(silviuc): Should add a regex-based validation of the format.
table_schema = bigquery.TableSchema()
schema_list = [s.strip(' ') for s in schema.split(',')]
Expand Down Expand Up @@ -1413,7 +1415,7 @@ def get_dict_table_schema(schema):
return schema
elif schema is None:
return schema
elif isinstance(schema, basestring):
elif isinstance(schema, string_types):
table_schema = WriteToBigQuery.get_table_schema_from_string(schema)
return WriteToBigQuery.table_schema_to_dict(table_schema)
elif isinstance(schema, bigquery.TableSchema):
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/io/vcfio.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import traceback
from collections import namedtuple

from six import string_types

import vcf

from apache_beam.coders import coders
Expand Down Expand Up @@ -404,7 +406,7 @@ def _convert_to_variant_record(self, record, infos, formats):
# Note: this is already done for INFO fields in PyVCF.
if (field in formats and
formats[field].num is None and
isinstance(data, (int, float, long, basestring, bool))):
isinstance(data, (int, float, long, string_types, bool))):
data = [data]
call.info[field] = data
variant.calls.append(call)
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/pvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import collections
import itertools

from six import string_types

from apache_beam import coders
from apache_beam import typehints
from apache_beam.internal import pickler
Expand Down Expand Up @@ -259,7 +261,7 @@ class TaggedOutput(object):
"""

def __init__(self, tag, value):
if not isinstance(tag, basestring):
if not isinstance(tag, string_types):
raise TypeError(
'Attempting to create a TaggedOutput with non-string tag %s' % tag)
self.tag = tag
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def process_outputs(self, windowed_input_element, results):
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
if not isinstance(tag, basestring):
if not isinstance(tag, six.string_types):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value
if isinstance(result, WindowedValue):
Expand Down Expand Up @@ -613,7 +613,7 @@ def finish_bundle_outputs(self, results):
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
if not isinstance(tag, basestring):
if not isinstance(tag, six.string_types):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value

Expand Down
9 changes: 5 additions & 4 deletions sdks/python/apache_beam/testing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ def default_label(self):
@experimental()
def open_shards(glob_pattern):
"""Returns a composite file of all shards matching the given glob pattern."""
with tempfile.NamedTemporaryFile(delete=False) as f:
with tempfile.NamedTemporaryFile(delete=False) as out_file:
for shard in glob.glob(glob_pattern):
f.write(file(shard).read())
concatenated_file_name = f.name
return file(concatenated_file_name, 'rb')
with open(shard) as in_file:
out_file.write(in_file.read())
concatenated_file_name = out_file.name
return open(concatenated_file_name, 'rb')
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import inspect
import types

from six import string_types

from apache_beam import coders
from apache_beam import pvalue
from apache_beam import typehints
Expand Down Expand Up @@ -1721,7 +1723,7 @@ def __init__(self, value):
value: An object of values for the PCollection
"""
super(Create, self).__init__()
if isinstance(value, basestring):
if isinstance(value, string_types):
raise TypeError('PTransform Create: Refusing to treat string as '
'an iterable. (string=%r)' % value)
elif isinstance(value, dict):
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _merge_tagged_vals_under_key(key_grouped, result_ctor,
# pairs. The result value constructor makes tuples with len(pcolls) slots.
pcolls = list(enumerate(pcolls))
result_ctor_arg = len(pcolls)
result_ctor = lambda size: tuple([] for _ in xrange(size))
result_ctor = lambda size: tuple([] for _ in range(size))

# Check input PCollections for PCollection-ness, and that they all belong
# to the same pipeline.
Expand Down