Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"""A package defining several input sources and output sinks."""

# pylint: disable=wildcard-import
from __future__ import absolute_import

from apache_beam.io.avroio import *
from apache_beam.io.filebasedsink import *
from apache_beam.io.iobase import Read
Expand Down
8 changes: 5 additions & 3 deletions sdks/python/apache_beam/io/avroio.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
that can be used to write a given ``PCollection`` of Python objects to an
Avro file.
"""
from __future__ import absolute_import

import cStringIO
import io
import os
import zlib
from builtins import object
from functools import partial

import avro
Expand Down Expand Up @@ -341,7 +343,7 @@ def _decompress_bytes(data, codec):
# We take care to avoid extra copies of data while slicing large objects
# by use of a buffer.
result = snappy.decompress(buffer(data)[:-4])
avroio.BinaryDecoder(cStringIO.StringIO(data[-4:])).check_crc32(result)
avroio.BinaryDecoder(io.BytesIO(data[-4:])).check_crc32(result)
return result
else:
raise ValueError('Unknown codec: %r' % codec)
Expand All @@ -351,7 +353,7 @@ def num_records(self):

def records(self):
decoder = avroio.BinaryDecoder(
cStringIO.StringIO(self._decompressed_block_bytes))
io.BytesIO(self._decompressed_block_bytes))
reader = avroio.DatumReader(
writers_schema=self._schema, readers_schema=self._schema)

Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/avroio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import

import json
import logging
import os
import tempfile
import unittest
from builtins import range

import avro.datafile
import avro.schema
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/io/concat_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

Concat Source, which reads the union of several other sources.
"""
from __future__ import absolute_import
from __future__ import division

import bisect
import threading
from builtins import range

from apache_beam.io import iobase

Expand Down
22 changes: 13 additions & 9 deletions sdks/python/apache_beam/io/concat_source_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
#

"""Unit tests for the sources framework."""
from __future__ import absolute_import
from __future__ import division

import logging
import unittest
from builtins import range

import apache_beam as beam
from apache_beam.io import iobase
Expand Down Expand Up @@ -91,10 +94,10 @@ def test_conact_source(self):
RangeSource(12, 16),
])
self.assertEqual(list(source.read(source.get_range_tracker())),
range(16))
list(range(16)))
self.assertEqual(list(source.read(source.get_range_tracker((1, None),
(2, 10)))),
range(4, 10))
list(range(4, 10)))
range_tracker = source.get_range_tracker(None, None)
self.assertEqual(range_tracker.position_at_fraction(0), (0, 0))
self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8))
Expand Down Expand Up @@ -176,10 +179,11 @@ def test_single_source(self):
read_all = source_test_utils.read_from_source

range10 = RangeSource(0, 10)
self.assertEquals(read_all(ConcatSource([range10])), range(10))
self.assertEquals(read_all(ConcatSource([range10]), (0, 5)), range(5, 10))
self.assertEquals(read_all(ConcatSource([range10])), list(range(10)))
self.assertEquals(read_all(ConcatSource([range10]), (0, 5)),
list(range(5, 10)))
self.assertEquals(read_all(ConcatSource([range10]), None, (0, 5)),
range(5))
list(range(5)))

def test_source_with_empty_ranges(self):
read_all = source_test_utils.read_from_source
Expand All @@ -189,11 +193,11 @@ def test_source_with_empty_ranges(self):

range10 = RangeSource(0, 10)
self.assertEquals(read_all(ConcatSource([empty, empty, range10])),
range(10))
list(range(10)))
self.assertEquals(read_all(ConcatSource([empty, range10, empty])),
range(10))
list(range(10)))
self.assertEquals(read_all(ConcatSource([range10, empty, range10, empty])),
range(10) + range(10))
list(range(10)) + list(range(10)))

def test_source_with_empty_ranges_exhastive(self):
empty = RangeSource(0, 0)
Expand All @@ -214,7 +218,7 @@ def test_run_concat_direct(self):
])
pipeline = TestPipeline()
pcoll = pipeline | beam.io.Read(source)
assert_that(pcoll, equal_to(range(1000)))
assert_that(pcoll, equal_to(list(range(1000))))

pipeline.run()

Expand Down
21 changes: 14 additions & 7 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import re
import time
import uuid
from builtins import range
from builtins import zip

from six import string_types
from future.utils import iteritems

from apache_beam.internal import util
from apache_beam.io import iobase
Expand All @@ -41,6 +43,11 @@

__all__ = ['FileBasedSink']

try:
unicode # pylint: disable=unicode-builtin
except NameError:
unicode = str


class FileBasedSink(iobase.Sink):
"""A sink to a GCS or local files.
Expand Down Expand Up @@ -75,10 +82,10 @@ def __init__(self,
~exceptions.ValueError: if **shard_name_template** is not of expected
format.
"""
if not isinstance(file_path_prefix, (string_types, ValueProvider)):
if not isinstance(file_path_prefix, ((str, unicode), ValueProvider)):
raise TypeError('file_path_prefix must be a string or ValueProvider;'
'got %r instead' % file_path_prefix)
if not isinstance(file_name_suffix, (string_types, ValueProvider)):
if not isinstance(file_name_suffix, ((str, unicode), ValueProvider)):
raise TypeError('file_name_suffix must be a string or ValueProvider;'
'got %r instead' % file_name_suffix)

Expand All @@ -89,9 +96,9 @@ def __init__(self,
shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
elif shard_name_template == '':
num_shards = 1
if isinstance(file_path_prefix, string_types):
if isinstance(file_path_prefix, (str, unicode)):
file_path_prefix = StaticValueProvider(str, file_path_prefix)
if isinstance(file_name_suffix, string_types):
if isinstance(file_name_suffix, (str, unicode)):
file_name_suffix = StaticValueProvider(str, file_name_suffix)
self.file_path_prefix = file_path_prefix
self.file_name_suffix = file_name_suffix
Expand Down Expand Up @@ -297,7 +304,7 @@ def _rename_batch(batch):
except BeamIOError as exp:
if exp.exception_details is None:
raise
for (src, dst), exception in exp.exception_details.iteritems():
for (src, dst), exception in iteritems(exp.exception_details):
if exception:
logging.error(('Exception in _rename_batch. src: %s, '
'dst: %s, err: %s'), src, dst, exception)
Expand All @@ -307,7 +314,7 @@ def _rename_batch(batch):
return exceptions

exception_batches = util.run_using_threadpool(
_rename_batch, zip(source_file_batch, destination_file_batch),
_rename_batch, list(zip(source_file_batch, destination_file_batch)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC run_using_threadpool accepts any iterable, so forcing a list is redundant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading util.run_using_threadpool, it looks like the code calls len(inputs), which may not be compatible with all iterables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, please discard my initial comment.

num_threads)

all_exceptions = [e for exception_batch in exception_batches
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/io/filebasedsink_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

"""Unit tests for file sinks."""

from __future__ import absolute_import

import glob
import logging
import os
import shutil
import tempfile
import unittest
from builtins import range

import hamcrest as hc
import mock
Expand Down
14 changes: 8 additions & 6 deletions sdks/python/apache_beam/io/filebasedsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
:class:`~apache_beam.io._AvroSource`.
"""

from six import integer_types
from six import string_types
from __future__ import absolute_import

from past.builtins import long
from past.builtins import unicode

from apache_beam.internal import pickler
from apache_beam.io import concat_source
Expand Down Expand Up @@ -99,12 +101,12 @@ def __init__(self,
result.
"""

if not isinstance(file_pattern, (string_types, ValueProvider)):
if not isinstance(file_pattern, ((str, unicode), ValueProvider)):
raise TypeError('%s: file_pattern must be of type string'
' or ValueProvider; got %r instead'
% (self.__class__.__name__, file_pattern))

if isinstance(file_pattern, string_types):
if isinstance(file_pattern, (str, unicode)):
file_pattern = StaticValueProvider(str, file_pattern)
self._pattern = file_pattern

Expand Down Expand Up @@ -235,11 +237,11 @@ class _SingleFileSource(iobase.BoundedSource):

def __init__(self, file_based_source, file_name, start_offset, stop_offset,
min_bundle_size=0, splittable=True):
if not isinstance(start_offset, integer_types):
if not isinstance(start_offset, (int, long)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you import from past you can replace this with just int.

See http://python-future.org/compatible_idioms.html#long-integers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to replace this with just int, we would have to import int from (future.)builtins, which gives problems when used for typechecks. To stay consistent with other modules, which do use typechecks, I would advice against this.

Instead, we can replace the try/except block with from past.builtins import long as mentioned in the comment above.

raise TypeError(
'start_offset must be a number. Received: %r' % start_offset)
if stop_offset != range_trackers.OffsetRangeTracker.OFFSET_INFINITY:
if not isinstance(stop_offset, integer_types):
if not isinstance(stop_offset, (int, long)):
raise TypeError(
'stop_offset must be a number. Received: %r' % stop_offset)
if start_offset >= stop_offset:
Expand Down
29 changes: 16 additions & 13 deletions sdks/python/apache_beam/io/filebasedsource_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
Expand All @@ -15,15 +14,19 @@
# limitations under the License.
#

from __future__ import absolute_import

import bz2
import cStringIO
import gzip
import io
import logging
import math
import os
import random
import tempfile
import unittest
from builtins import object
from builtins import range

import hamcrest as hc

Expand Down Expand Up @@ -153,7 +156,7 @@ def __init__(self, values):
def split(self, desired_bundle_size, start_position=None,
stop_position=None):
# simply devides values into two bundles
middle = len(self._values) / 2
middle = len(self._values) // 2
yield iobase.SourceBundle(0.5, TestConcatSource.DummySource(
self._values[:middle]), None, None)
yield iobase.SourceBundle(0.5, TestConcatSource.DummySource(
Expand Down Expand Up @@ -188,11 +191,11 @@ def test_read(self):
concat = ConcatSource(sources)
range_tracker = concat.get_range_tracker(None, None)
read_data = [value for value in concat.read(range_tracker)]
self.assertItemsEqual(range(30), read_data)
self.assertItemsEqual(list(range(30)), read_data)

def test_split(self):
sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
in [0, 10, 20]]
sources = [TestConcatSource.DummySource(list(range(start, start + 10)))
for start in [0, 10, 20]]
concat = ConcatSource(sources)
splits = [split for split in concat.split()]
self.assertEquals(6, len(splits))
Expand All @@ -205,7 +208,7 @@ def test_split(self):
split.stop_position)
read_data.extend([value for value in split.source.read(
range_tracker_for_split)])
self.assertItemsEqual(range(30), read_data)
self.assertItemsEqual(list(range(30)), read_data)

def test_estimate_size(self):
sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
Expand Down Expand Up @@ -473,8 +476,8 @@ def test_read_pattern_gzip(self):
chunks = [lines[splits[i-1]:splits[i]] for i in range(1, len(splits))]
compressed_chunks = []
for c in chunks:
out = cStringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="wb") as f:
f.write('\n'.join(c))
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(compressed_chunks)
Expand Down Expand Up @@ -520,8 +523,8 @@ def test_read_auto_pattern(self):
chunks = [lines[splits[i - 1]:splits[i]] for i in range(1, len(splits))]
compressed_chunks = []
for c in chunks:
out = cStringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="wb") as f:
f.write('\n'.join(c))
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(
Expand All @@ -540,8 +543,8 @@ def test_read_auto_pattern_compressed_and_uncompressed(self):
chunks_to_write = []
for i, c in enumerate(chunks):
if i%2 == 0:
out = cStringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="wb") as f:
f.write('\n'.join(c))
chunks_to_write.append(out.getvalue())
else:
Expand Down
Loading