diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py index 31e4392f6454..1e8603ab3318 100644 --- a/sdks/python/apache_beam/io/concat_source_test.py +++ b/sdks/python/apache_beam/io/concat_source_test.py @@ -35,6 +35,8 @@ class RangeSource(iobase.BoundedSource): + __hash__ = None + def __init__(self, start, end, split_freq=1): assert start <= end self._start = start @@ -76,7 +78,9 @@ def read(self, range_tracker): # For testing def __eq__(self, other): return (type(self) == type(other) - and self._start == other._start and self._end == other._end) + and self._start == other._start + and self._end == other._end + and self._split_freq == other._split_freq) def __ne__(self, other): return not self == other diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py index dc6726e6442a..5a09582a1c33 100644 --- a/sdks/python/apache_beam/io/filebasedsink.py +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -60,6 +60,7 @@ class FileBasedSink(iobase.Sink): # Max number of threads to be used for renaming. _MAX_RENAME_THREADS = 64 + __hash__ = None def __init__(self, file_path_prefix, diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index e9312238f5b5..71498d062a0f 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -15,6 +15,7 @@ # from __future__ import absolute_import +from __future__ import division import bz2 import gzip diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py index 437f38850bac..66cca24f1936 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py @@ -22,6 +22,7 @@ import logging import time from builtins import object +from builtins import round from apache_beam.io.gcp.datastore.v1 import helper from apache_beam.io.gcp.datastore.v1 import query_splitter diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index a27df09ac0e0..5fdc3a7aeff1 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -20,6 +20,8 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import + import errno import logging import sys diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py index 7723fb7b6465..ef3c1e4a18f5 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py @@ -20,6 +20,7 @@ from __future__ import division from builtins import range +from builtins import round from apache_beam.io.gcp.datastore.v1 import helper diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/util_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/util_test.py index 8f17c21533c8..8b05ebef0998 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/util_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/util_test.py @@ -16,6 +16,8 @@ # """Tests for util.py.""" +from __future__ import absolute_import + import unittest from apache_beam.io.gcp.datastore.v1 import util diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 88f7ce93f46a..757475d2db61 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -18,6 +18,8 @@ """Unit tests for GCS File System.""" +from __future__ import absolute_import + import logging import unittest from builtins import zip diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index b10926c41906..0a0b16d63be1 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -15,6 +15,7 @@ # limitations under the License. # """Tests for Google Cloud Storage client.""" +from __future__ import absolute_import from __future__ import division import errno diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index d287ef283735..c3921bb2e55b 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -17,6 +17,7 @@ """ Integration test for Google Cloud Pub/Sub. """ +from __future__ import absolute_import import logging import unittest diff --git a/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py b/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py index 407ed8830fa2..0c4535f1a294 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py @@ -18,6 +18,8 @@ Test pipeline for use by pubsub_integration_test. """ +from __future__ import absolute_import + import argparse import apache_beam as beam diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py index 83f21a2a589b..0e5948163f9d 100644 --- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py @@ -17,6 +17,8 @@ """Unit test for PubSub verifier.""" +from __future__ import absolute_import + import logging import unittest diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py index cce0ac72a33b..a943a12bb4d7 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py @@ -35,6 +35,7 @@ class FakeFile(io.BytesIO): """File object for FakeHdfs""" + __hash__ = None def __init__(self, path, mode='', type='FILE'): io.BytesIO.__init__(self) diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py index 50439e198177..0514b74ef911 100644 --- a/sdks/python/apache_beam/io/range_trackers.py +++ b/sdks/python/apache_beam/io/range_trackers.py @@ -20,9 +20,11 @@ from __future__ import absolute_import from __future__ import division +import codecs import logging import math import threading +from builtins import zip from past.builtins import long @@ -402,7 +404,7 @@ def _string_to_int(s, prec): s += '\0' * (prec - len(s)) else: s = s[:prec] - return int(s.encode('hex'), 16) + return int(codecs.encode(s, 'hex'), 16) @staticmethod def _string_from_int(i, prec): @@ -410,4 +412,4 @@ def _string_from_int(i, prec): Inverse of _string_to_int. """ h = '%x' % i - return ('0' * (2 * prec - len(h)) + h).decode('hex') + return codecs.decode('0' * (2 * prec - len(h)) + h, 'hex') diff --git a/sdks/python/apache_beam/io/restriction_trackers.py b/sdks/python/apache_beam/io/restriction_trackers.py index 014125f29006..e7224204fd54 100644 --- a/sdks/python/apache_beam/io/restriction_trackers.py +++ b/sdks/python/apache_beam/io/restriction_trackers.py @@ -42,6 +42,9 @@ def __eq__(self, other): return self.start == other.start and self.stop == other.stop + def __hash__(self): + return hash((type(self), self.start, self.stop)) + def split(self, desired_num_offsets_per_split, min_num_offsets_per_split=1): current_split_start = self.start max_split_size = max(desired_num_offsets_per_split, diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py index 2ef7c5b4c729..5561d2c68b03 100644 --- a/sdks/python/apache_beam/io/tfrecordio.py +++ b/sdks/python/apache_beam/io/tfrecordio.py @@ -18,6 +18,7 @@ from __future__ import absolute_import +import codecs import logging import struct from builtins import object @@ -120,24 +121,24 @@ def read_record(cls, file_handle): # Validate all length related payloads. if len(buf) != buf_length_expected: raise ValueError('Not a valid TFRecord. Fewer than %d bytes: %s' % - (buf_length_expected, buf.encode('hex'))) + (buf_length_expected, codecs.encode(buf, 'hex'))) length, length_mask_expected = struct.unpack('