Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/io/concat_source_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

class RangeSource(iobase.BoundedSource):

__hash__ = None

def __init__(self, start, end, split_freq=1):
assert start <= end
self._start = start
Expand Down Expand Up @@ -76,7 +78,9 @@ def read(self, range_tracker):
# For testing
def __eq__(self, other):
return (type(self) == type(other)
and self._start == other._start and self._end == other._end)
and self._start == other._start
and self._end == other._end
and self._split_freq == other._split_freq)

def __ne__(self, other):
return not self == other
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class FileBasedSink(iobase.Sink):

# Max number of threads to be used for renaming.
_MAX_RENAME_THREADS = 64
__hash__ = None

def __init__(self,
file_path_prefix,
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/filebasedsource_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

from __future__ import absolute_import
from __future__ import division

import bz2
import gzip
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
import time
from builtins import object
from builtins import round

from apache_beam.io.gcp.datastore.v1 import helper
from apache_beam.io.gcp.datastore.v1 import query_splitter
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
For internal use only; no backwards-compatibility guarantees.
"""

from __future__ import absolute_import

import errno
import logging
import sys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import division

from builtins import range
from builtins import round

from apache_beam.io.gcp.datastore.v1 import helper

Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/gcp/datastore/v1/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#

"""Tests for util.py."""
from __future__ import absolute_import

import unittest

from apache_beam.io.gcp.datastore.v1 import util
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

"""Unit tests for GCS File System."""

from __future__ import absolute_import

import logging
import unittest
from builtins import zip
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#
"""Tests for Google Cloud Storage client."""
from __future__ import absolute_import
from __future__ import division

import errno
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""
Integration test for Google Cloud Pub/Sub.
"""
from __future__ import absolute_import

import logging
import unittest
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
Test pipeline for use by pubsub_integration_test.
"""

from __future__ import absolute_import

import argparse

import apache_beam as beam
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Unit test for PubSub verifier."""

from __future__ import absolute_import

import logging
import unittest

Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/hadoopfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

class FakeFile(io.BytesIO):
"""File object for FakeHdfs"""
__hash__ = None

def __init__(self, path, mode='', type='FILE'):
io.BytesIO.__init__(self)
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/io/range_trackers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from __future__ import absolute_import
from __future__ import division

import codecs
import logging
import math
import threading
from builtins import zip

from past.builtins import long

Expand Down Expand Up @@ -402,12 +404,12 @@ def _string_to_int(s, prec):
s += '\0' * (prec - len(s))
else:
s = s[:prec]
return int(s.encode('hex'), 16)
return int(codecs.encode(s, 'hex'), 16)

@staticmethod
def _string_from_int(i, prec):
"""
Inverse of _string_to_int.
"""
h = '%x' % i
return ('0' * (2 * prec - len(h)) + h).decode('hex')
return codecs.decode('0' * (2 * prec - len(h)) + h, 'hex')
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/io/restriction_trackers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def __eq__(self, other):

return self.start == other.start and self.stop == other.stop

def __hash__(self):
return hash((type(self), self.start, self.stop))

def split(self, desired_num_offsets_per_split, min_num_offsets_per_split=1):
current_split_start = self.start
max_split_size = max(desired_num_offsets_per_split,
Expand Down
9 changes: 5 additions & 4 deletions sdks/python/apache_beam/io/tfrecordio.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from __future__ import absolute_import

import codecs
import logging
import struct
from builtins import object
Expand Down Expand Up @@ -120,24 +121,24 @@ def read_record(cls, file_handle):
# Validate all length related payloads.
if len(buf) != buf_length_expected:
raise ValueError('Not a valid TFRecord. Fewer than %d bytes: %s' %
(buf_length_expected, buf.encode('hex')))
(buf_length_expected, codecs.encode(buf, 'hex')))
length, length_mask_expected = struct.unpack('<QI', buf)
length_mask_actual = cls._masked_crc32c(buf[:8])
if length_mask_actual != length_mask_expected:
raise ValueError('Not a valid TFRecord. Mismatch of length mask: %s' %
buf.encode('hex'))
codecs.encode(buf, 'hex'))

# Validate all data related payloads.
buf_length_expected = length + 4
buf = file_handle.read(buf_length_expected)
if len(buf) != buf_length_expected:
raise ValueError('Not a valid TFRecord. Fewer than %d bytes: %s' %
(buf_length_expected, buf.encode('hex')))
(buf_length_expected, codecs.encode(buf, 'hex')))
data, data_mask_expected = struct.unpack('<%dsI' % length, buf)
data_mask_actual = cls._masked_crc32c(data)
if data_mask_actual != data_mask_expected:
raise ValueError('Not a valid TFRecord. Mismatch of data mask: %s' %
buf.encode('hex'))
codecs.encode(buf, 'hex'))

# All validation checks passed.
return data
Expand Down
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/io/vcfio.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from collections import namedtuple

from future.utils import iteritems
from past.builtins import long
from past.builtins import unicode

import vcf
Expand Down Expand Up @@ -72,6 +73,7 @@ class Variant(object):

Each object corresponds to a single record in a VCF file.
"""
__hash__ = None

def __init__(self,
reference_name=None,
Expand Down Expand Up @@ -187,6 +189,8 @@ class VariantCall(object):
variant. It may include associated information such as quality and phasing.
"""

__hash__ = None

def __init__(self, name=None, genotype=None, phaseset=None, info=None):
"""Initialize the :class:`VariantCall` object.

Expand Down Expand Up @@ -407,7 +411,7 @@ def _convert_to_variant_record(self, record, infos, formats):
# Note: this is already done for INFO fields in PyVCF.
if (field in formats and
formats[field].num is None and
isinstance(data, (int, float, int, str, unicode, bool))):
isinstance(data, (int, float, long, str, unicode, bool))):
data = [data]
call.info[field] = data
variant.calls.append(call)
Expand Down
1 change: 1 addition & 0 deletions sdks/python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ modules =
apache_beam/runners
apache_beam/examples
apache_beam/portability
apache_beam/io
apache_beam/internal
apache_beam/metrics
apache_beam/options
Expand Down