Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion sdks/python/apache_beam/coders/coder_impl.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ cdef class DeterministicFastPrimitivesCoderImpl(CoderImpl):
cdef bint _check_safe(self, value) except -1


cdef object NoneType
cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE, BOOL_TYPE
cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE, SET_TYPE

Expand Down
10 changes: 5 additions & 5 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"""
from __future__ import absolute_import

from types import NoneType
import six

from apache_beam.coders import observable
from apache_beam.utils import windowed_value
Expand Down Expand Up @@ -197,7 +197,7 @@ def __init__(self, coder, step_label):
self._step_label = step_label

def _check_safe(self, value):
if isinstance(value, (str, unicode, long, int, float)):
if isinstance(value, (str, six.text_type, long, int, float)):
pass
elif value is None:
pass
Expand Down Expand Up @@ -277,7 +277,7 @@ def get_estimated_size_and_observables(self, value, nested=False):

def encode_to_stream(self, value, stream, nested):
t = type(value)
if t is NoneType:
if value is None:
stream.write_byte(NONE_TYPE)
elif t is int:
stream.write_byte(INT_TYPE)
Expand All @@ -288,7 +288,7 @@ def encode_to_stream(self, value, stream, nested):
elif t is str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It maybe makes sense to add an explicit byte test in coders_test

stream.write_byte(STR_TYPE)
stream.write(value, nested)
elif t is unicode:
elif t is six.text_type:
unicode_value = value # for typing
stream.write_byte(UNICODE_TYPE)
stream.write(unicode_value.encode('utf-8'), nested)
Expand All @@ -302,7 +302,7 @@ def encode_to_stream(self, value, stream, nested):
dict_value = value # for typing
stream.write_byte(DICT_TYPE)
stream.write_var_int64(len(dict_value))
for k, v in dict_value.iteritems():
for k, v in six.iteritems(dict_value):
self.encode_to_stream(k, stream, True)
self.encode_to_stream(v, stream, True)
elif t is bool:
Expand Down
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@
from __future__ import absolute_import

import base64
import cPickle as pickle

# pylint: disable=ungrouped-imports
import google.protobuf
import six
from google.protobuf import wrappers_pb2

import six.moves.cPickle as pickle
from apache_beam.coders import coder_impl
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.utils import proto_utils

# pylint: enable=ungrouped-imports


# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
from .stream import get_varint_size
Expand Down Expand Up @@ -309,7 +314,7 @@ class ToStringCoder(Coder):
"""A default string coder used if no sink coder is specified."""

def encode(self, value):
if isinstance(value, unicode):
if isinstance(value, six.text_type):
return value.encode('utf-8')
elif isinstance(value, str):
return value
Expand Down
1 change: 0 additions & 1 deletion sdks/python/apache_beam/coders/fast_coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import logging
import unittest


# Run all the standard coder test cases.
from apache_beam.coders.coders_test_common import *

Expand Down
1 change: 0 additions & 1 deletion sdks/python/apache_beam/coders/slow_coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import logging
import unittest


# Run all the standard coder test cases.
from apache_beam.coders.coders_test_common import *

Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/coders/stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Tests for the stream implementations."""

from __future__ import absolute_import

import logging
import math
import unittest
Expand Down
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ def MakeXyzs(v):
See apache_beam.typehints.decorators module for more details.
"""

from __future__ import absolute_import

import six

from apache_beam.coders import coders
from apache_beam.typehints import typehints

Expand All @@ -84,7 +88,7 @@ def register_standard_coders(self, fallback_coder):
self._register_coder_internal(float, coders.FloatCoder)
self._register_coder_internal(str, coders.BytesCoder)
self._register_coder_internal(bytes, coders.BytesCoder)
self._register_coder_internal(unicode, coders.StrUtf8Coder)
self._register_coder_internal(six.text_type, coders.StrUtf8Coder)
self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
# Default fallback coders applied in that order until the first matching
# coder found.
Expand Down
10 changes: 7 additions & 3 deletions sdks/python/apache_beam/io/avroio.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
Avro file.
"""

import cStringIO
import os
import zlib
from functools import partial
Expand All @@ -60,6 +59,11 @@
from apache_beam.io.iobase import Read
from apache_beam.transforms import PTransform

try:
from cStringIO import StringIO as BytesIO
except ImportError:
from io import BytesIO

__all__ = ['ReadFromAvro', 'ReadAllFromAvro', 'WriteToAvro']


Expand Down Expand Up @@ -311,7 +315,7 @@ def _decompress_bytes(data, codec):
# We take care to avoid extra copies of data while slicing large objects
# by use of a buffer.
result = snappy.decompress(buffer(data)[:-4])
avroio.BinaryDecoder(cStringIO.StringIO(data[-4:])).check_crc32(result)
avroio.BinaryDecoder(BytesIO(data[-4:])).check_crc32(result)
return result
else:
raise ValueError('Unknown codec: %r', codec)
Expand All @@ -321,7 +325,7 @@ def num_records(self):

def records(self):
decoder = avroio.BinaryDecoder(
cStringIO.StringIO(self._decompressed_block_bytes))
BytesIO(self._decompressed_block_bytes))
reader = avroio.DatumReader(
writers_schema=self._schema, readers_schema=self._schema)

Expand Down
19 changes: 10 additions & 9 deletions sdks/python/apache_beam/io/concat_source_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ def test_conact_source(self):
RangeSource(12, 16),
])
self.assertEqual(list(source.read(source.get_range_tracker())),
range(16))
list(range(16)))
self.assertEqual(list(source.read(source.get_range_tracker((1, None),
(2, 10)))),
range(4, 10))
list(range(4, 10)))
range_tracker = source.get_range_tracker(None, None)
self.assertEqual(range_tracker.position_at_fraction(0), (0, 0))
self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8))
Expand Down Expand Up @@ -176,10 +176,11 @@ def test_single_source(self):
read_all = source_test_utils.read_from_source

range10 = RangeSource(0, 10)
self.assertEquals(read_all(ConcatSource([range10])), range(10))
self.assertEquals(read_all(ConcatSource([range10]), (0, 5)), range(5, 10))
self.assertEquals(read_all(ConcatSource([range10])), list(range(10)))
self.assertEquals(read_all(ConcatSource([range10]), (0, 5)),
list(range(5, 10)))
self.assertEquals(read_all(ConcatSource([range10]), None, (0, 5)),
range(5))
list(range(5)))

def test_source_with_empty_ranges(self):
read_all = source_test_utils.read_from_source
Expand All @@ -189,11 +190,11 @@ def test_source_with_empty_ranges(self):

range10 = RangeSource(0, 10)
self.assertEquals(read_all(ConcatSource([empty, empty, range10])),
range(10))
list(range(10)))
self.assertEquals(read_all(ConcatSource([empty, range10, empty])),
range(10))
list(range(10)))
self.assertEquals(read_all(ConcatSource([range10, empty, range10, empty])),
range(10) + range(10))
list(range(10)) + list(range(10)))

def test_source_with_empty_ranges_exhastive(self):
empty = RangeSource(0, 0)
Expand All @@ -214,7 +215,7 @@ def test_run_concat_direct(self):
])
pipeline = TestPipeline()
pcoll = pipeline | beam.io.Read(source)
assert_that(pcoll, equal_to(range(1000)))
assert_that(pcoll, equal_to(list(range(1000))))

pipeline.run()

Expand Down
14 changes: 8 additions & 6 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import time
import uuid

import six

from apache_beam.internal import util
from apache_beam.io import iobase
from apache_beam.io.filesystem import BeamIOError
Expand Down Expand Up @@ -73,10 +75,10 @@ def __init__(self,
~exceptions.ValueError: if **shard_name_template** is not of expected
format.
"""
if not isinstance(file_path_prefix, (basestring, ValueProvider)):
if not isinstance(file_path_prefix, (six.string_types, ValueProvider)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to just use from past.builtins import basestring to reduce the number of lines that need to change but this is fine as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be good. Are we ok with adding future as a dependency to master? Or should we get a separate Python3 branch first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think future is a reasonable dependency to take in master. Although the idea of a Python 3 branch could be good if we continue to take a long time to review/merge Python 3 fixes into master (although then we'd need people to be able to cooperate on a Python 3 branch explicitly).

raise TypeError('file_path_prefix must be a string or ValueProvider;'
'got %r instead' % file_path_prefix)
if not isinstance(file_name_suffix, (basestring, ValueProvider)):
if not isinstance(file_name_suffix, (six.string_types, ValueProvider)):
raise TypeError('file_name_suffix must be a string or ValueProvider;'
'got %r instead' % file_name_suffix)

Expand All @@ -87,9 +89,9 @@ def __init__(self,
shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
elif shard_name_template == '':
num_shards = 1
if isinstance(file_path_prefix, basestring):
if isinstance(file_path_prefix, six.string_types):
file_path_prefix = StaticValueProvider(str, file_path_prefix)
if isinstance(file_name_suffix, basestring):
if isinstance(file_name_suffix, six.string_types):
file_name_suffix = StaticValueProvider(str, file_name_suffix)
self.file_path_prefix = file_path_prefix
self.file_name_suffix = file_name_suffix
Expand Down Expand Up @@ -221,7 +223,7 @@ def _rename_batch(batch):
except BeamIOError as exp:
if exp.exception_details is None:
raise
for (src, dest), exception in exp.exception_details.iteritems():
for (src, dest), exception in exp.exception_details.items():
if exception:
logging.warning('Rename not successful: %s -> %s, %s', src, dest,
exception)
Expand All @@ -243,7 +245,7 @@ def _rename_batch(batch):
return exceptions

exception_batches = util.run_using_threadpool(
_rename_batch, zip(source_file_batch, destination_file_batch),
_rename_batch, list(zip(source_file_batch, destination_file_batch)),
num_threads)

all_exceptions = [e for exception_batch in exception_batches
Expand Down
12 changes: 7 additions & 5 deletions sdks/python/apache_beam/io/filebasedsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
:class:`~apache_beam.io._AvroSource`.
"""

from six import integer_types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it maybe make sense to import integer_types and string_types with the from directive here (or I guess what is the reason for the change)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way works. I just chose one way to keep things consistent after applying the auto-converter. Is there anything I'm missing?

from __future__ import absolute_import

import six

from apache_beam.internal import pickler
from apache_beam.io import concat_source
Expand Down Expand Up @@ -98,12 +100,12 @@ def __init__(self,
result.
"""

if not isinstance(file_pattern, (basestring, ValueProvider)):
if not isinstance(file_pattern, (six.string_types, ValueProvider)):
raise TypeError('%s: file_pattern must be of type string'
' or ValueProvider; got %r instead'
% (self.__class__.__name__, file_pattern))

if isinstance(file_pattern, basestring):
if isinstance(file_pattern, six.string_types):
file_pattern = StaticValueProvider(str, file_pattern)
self._pattern = file_pattern

Expand Down Expand Up @@ -234,11 +236,11 @@ class _SingleFileSource(iobase.BoundedSource):

def __init__(self, file_based_source, file_name, start_offset, stop_offset,
min_bundle_size=0, splittable=True):
if not isinstance(start_offset, integer_types):
if not isinstance(start_offset, six.integer_types):
raise TypeError(
'start_offset must be a number. Received: %r' % start_offset)
if stop_offset != range_trackers.OffsetRangeTracker.OFFSET_INFINITY:
if not isinstance(stop_offset, integer_types):
if not isinstance(stop_offset, six.integer_types):
raise TypeError(
'stop_offset must be a number. Received: %r' % stop_offset)
if start_offset >= stop_offset:
Expand Down
16 changes: 8 additions & 8 deletions sdks/python/apache_beam/io/filebasedsource_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,16 @@ def setUp(self):
filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2

def test_read(self):
sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
in [0, 10, 20]]
sources = [TestConcatSource.DummySource(range(start, start + 10))
for start in [0, 10, 20]]
concat = ConcatSource(sources)
range_tracker = concat.get_range_tracker(None, None)
read_data = [value for value in concat.read(range_tracker)]
self.assertItemsEqual(range(30), read_data)
self.assertItemsEqual(list(range(30)), read_data)

def test_split(self):
sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
in [0, 10, 20]]
sources = [TestConcatSource.DummySource(range(start, start + 10))
for start in [0, 10, 20]]
concat = ConcatSource(sources)
splits = [split for split in concat.split()]
self.assertEquals(6, len(splits))
Expand All @@ -205,11 +205,11 @@ def test_split(self):
split.stop_position)
read_data.extend([value for value in split.source.read(
range_tracker_for_split)])
self.assertItemsEqual(range(30), read_data)
self.assertItemsEqual(list(range(30)), read_data)

def test_estimate_size(self):
sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
in [0, 10, 20]]
sources = [TestConcatSource.DummySource(range(start, start + 10))
for start in [0, 10, 20]]
concat = ConcatSource(sources)
self.assertEquals(30, concat.estimate_size())

Expand Down
Loading