diff --git a/sdks/python/apache_beam/coders/__init__.py b/sdks/python/apache_beam/coders/__init__.py index acca89f70f48..4a7e509ed75e 100644 --- a/sdks/python/apache_beam/coders/__init__.py +++ b/sdks/python/apache_beam/coders/__init__.py @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function from apache_beam.coders.coders import * from apache_beam.coders.typecoders import registry diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 98dd508556a0..dd82a00ab1a0 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -74,7 +74,7 @@ cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE, SET_TYPE cdef class FastPrimitivesCoderImpl(StreamCoderImpl): cdef CoderImpl fallback_coder_impl - @cython.locals(unicode_value=unicode, dict_value=dict) + @cython.locals(dict_value = dict) cpdef encode_to_stream(self, value, OutputStream stream, bint nested) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index cc7ed87c3ad1..ee58c3fb0431 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -25,12 +25,26 @@ coder_impl.pxd file for type hints. For internal use only; no backwards-compatibility guarantees. + +isort:skip_file """ from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +native_int = int -from types import NoneType +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +from builtins import bytes +from builtins import chr +from builtins import int +from builtins import object +from builtins import range +from builtins import str -import six +from past.builtins import str as old_str +from past.builtins import long +from past.builtins import unicode from apache_beam.coders import observable from apache_beam.utils import windowed_value @@ -38,7 +52,6 @@ from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.utils.timestamp import Timestamp -# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: from .stream import InputStream as create_InputStream from .stream import OutputStream as create_OutputStream @@ -54,11 +67,6 @@ from .slow_stream import get_varint_size # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports -try: - long # Python 2 -except NameError: - long = int # Python 3 - class CoderImpl(object): """For internal use only; no backwards-compatibility guarantees.""" @@ -199,7 +207,7 @@ def __init__(self, coder, step_label): self._step_label = step_label def _check_safe(self, value): - if isinstance(value, (str, six.text_type, long, int, float)): + if isinstance(value, (str, bytes, int, float)): pass elif value is None: pass @@ -279,18 +287,21 @@ def get_estimated_size_and_observables(self, value, nested=False): def encode_to_stream(self, value, stream, nested): t = type(value) - if t is NoneType: + if value is None: stream.write_byte(NONE_TYPE) - elif t is int: + elif t is bool: + stream.write_byte(BOOL_TYPE) + stream.write_byte(value) + elif t is int or t is native_int or t is long: stream.write_byte(INT_TYPE) stream.write_var_int64(value) elif t is float: stream.write_byte(FLOAT_TYPE) stream.write_bigendian_double(value) - elif t is str: + elif t is bytes or t is old_str: stream.write_byte(STR_TYPE) stream.write(value, nested) - elif t is six.text_type: + elif t is str or t is unicode: unicode_value = value # for typing stream.write_byte(UNICODE_TYPE) stream.write(unicode_value.encode('utf-8'), nested) @@ -304,12 +315,9 @@ def encode_to_stream(self, value, stream, nested): dict_value = value # for typing stream.write_byte(DICT_TYPE) stream.write_var_int64(len(dict_value)) - for k, v in dict_value.iteritems(): + for k, v in dict_value.items(): self.encode_to_stream(k, stream, True) self.encode_to_stream(v, stream, True) - elif t is bool: - stream.write_byte(BOOL_TYPE) - stream.write_byte(value) else: stream.write_byte(UNKNOWN_TYPE) self.fallback_coder_impl.encode_to_stream(value, stream, nested) @@ -318,6 +326,8 @@ def decode_from_stream(self, stream, nested): t = stream.read_byte() if t == NONE_TYPE: return None + elif t == BOOL_TYPE: + return not not stream.read_byte() elif t == INT_TYPE: return stream.read_var_int64() elif t == FLOAT_TYPE: @@ -341,8 +351,6 @@ def decode_from_stream(self, stream, nested): k = self.decode_from_stream(stream, True) v[k] = self.decode_from_stream(stream, True) return v - elif t == BOOL_TYPE: - return not not stream.read_byte() return self.fallback_coder_impl.decode_from_stream(stream, nested) @@ -394,8 +402,9 @@ def _from_normal_time(self, value): def encode_to_stream(self, value, out, nested): span_micros = value.end.micros - value.start.micros - out.write_bigendian_uint64(self._from_normal_time(value.end.micros / 1000)) - out.write_var_int64(span_micros / 1000) + out.write_bigendian_uint64( + self._from_normal_time(value.end.micros // 1000)) + out.write_var_int64(span_micros // 1000) def decode_from_stream(self, in_, nested): end_millis = self._to_normal_time(in_.read_bigendian_uint64()) @@ -409,7 +418,7 @@ def estimate_size(self, value, nested=False): # An IntervalWindow is context-insensitive, with a timestamp (8 bytes) # and a varint timespam. span = value.end.micros - value.start.micros - return 8 + get_varint_size(span / 1000) + return 8 + get_varint_size(span // 1000) class TimestampCoderImpl(StreamCoderImpl): @@ -427,7 +436,7 @@ def estimate_size(self, unused_value, nested=False): return 8 -small_ints = [chr(_) for _ in range(128)] +small_ints = [chr(_).encode('latin-1') for _ in range(128)] class VarIntCoderImpl(StreamCoderImpl): @@ -783,7 +792,7 @@ def encode_to_stream(self, value, out, nested): # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on # precision of timestamps. self._from_normal_time( - restore_sign * (abs(wv.timestamp_micros) / 1000))) + restore_sign * (abs(wv.timestamp_micros) // 1000))) self._windows_coder.encode_to_stream(wv.windows, out, True) # Default PaneInfo encoded byte representing NO_FIRING. self._pane_info_coder.encode_to_stream(wv.pane_info, out, True) @@ -797,9 +806,9 @@ def decode_from_stream(self, in_stream, nested): # were indeed MIN/MAX timestamps. # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on # precision of timestamps. - if timestamp == -(abs(MIN_TIMESTAMP.micros) / 1000): + if timestamp == -(abs(MIN_TIMESTAMP.micros) // 1000): timestamp = MIN_TIMESTAMP.micros - elif timestamp == (MAX_TIMESTAMP.micros / 1000): + elif timestamp == (MAX_TIMESTAMP.micros // 1000): timestamp = MAX_TIMESTAMP.micros else: timestamp *= 1000 diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index ecbdd538d38b..b943afd0c2b5 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -20,9 +20,12 @@ Only those coders listed in __all__ are part of the public API of this module. """ from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import base64 -import cPickle as pickle +from builtins import object +from builtins import str import google.protobuf from google.protobuf import wrappers_pb2 @@ -33,6 +36,12 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.utils import proto_utils +# This is for py2/3 compatibility. cPickle was renamed pickle in python 3. +try: + import cPickle as pickle # Python 2 +except ImportError: + import pickle # Python 3 + # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: from .stream import get_varint_size @@ -216,6 +225,9 @@ def __eq__(self, other): and self._dict_without_impl() == other._dict_without_impl()) # pylint: enable=protected-access + def __hash__(self): + return hash(type(self)) + _known_urns = {} @classmethod @@ -309,11 +321,6 @@ class ToStringCoder(Coder): """A default string coder used if no sink coder is specified.""" def encode(self, value): - try: # Python 2 - if isinstance(value, unicode): - return value.encode('utf-8') - except NameError: # Python 3 - pass return str(value) def decode(self, _): diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index 705de8920d52..1a32b2df31c9 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -14,11 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import base64 import logging import unittest +from builtins import object from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message from apache_beam.coders import coders @@ -99,6 +102,9 @@ def __eq__(self, other): return True return False + def __hash__(self): + return hash(type(self)) + class FallbackCoderTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 0ea7da2b6ad5..7b709d3d6451 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -17,10 +17,15 @@ """Tests common to all coder implementations.""" from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import logging import math import unittest +from builtins import int +from builtins import range +from builtins import str import dill @@ -40,10 +45,10 @@ class CustomCoder(coders.Coder): def encode(self, x): - return str(x+1) + return str(x + 1).encode('latin-1') def decode(self, encoded): - return int(encoded) - 1 + return int(encoded.decode('latin-1')) - 1 class CodersTest(unittest.TestCase): @@ -103,7 +108,7 @@ def test_custom_coder(self): self.check_coder(CustomCoder(), 1, -10, 5) self.check_coder(coders.TupleCoder((CustomCoder(), coders.BytesCoder())), - (1, 'a'), (-10, 'b'), (5, 'c')) + (1, b'a'), (-10, b'b'), (5, b'c')) def test_pickle_coder(self): self.check_coder(coders.PickleCoder(), 'a', 1, 1.5, (1, 2, 3)) @@ -139,7 +144,7 @@ def test_fast_primitives_coder(self): self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,)) def test_bytes_coder(self): - self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000) + self.check_coder(coders.BytesCoder(), b'a', b'\0', b'z' * 1000) def test_varint_coder(self): # Small ints. @@ -190,7 +195,7 @@ def test_timestamp_coder(self): timestamp.Timestamp(micros=1234567890123456789)) self.check_coder( coders.TupleCoder((coders.TimestampCoder(), coders.BytesCoder())), - (timestamp.Timestamp.of(27), 'abc')) + (timestamp.Timestamp.of(27), b'abc')) def test_tuple_coder(self): kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())) @@ -206,22 +211,22 @@ def test_tuple_coder(self): kv_coder.as_cloud_object()) # Test binary representation self.assertEqual( - '\x04abc', - kv_coder.encode((4, 'abc'))) + b'\x04abc', + kv_coder.encode((4, b'abc'))) # Test unnested self.check_coder( kv_coder, - (1, 'a'), - (-2, 'a' * 100), - (300, 'abc\0' * 5)) + (1, b'a'), + (-2, b'a' * 100), + (300, b'abc\0' * 5)) # Test nested self.check_coder( coders.TupleCoder( (coders.TupleCoder((coders.PickleCoder(), coders.VarIntCoder())), coders.StrUtf8Coder())), - ((1, 2), 'a'), + ((1, 2), u'a'), ((-2, 5), u'a\u0101' * 100), - ((300, 1), 'abc\0' * 5)) + ((300, 1), u'abc\0' * 5)) def test_tuple_sequence_coder(self): int_tuple_coder = coders.TupleSequenceCoder(coders.VarIntCoder()) @@ -234,7 +239,7 @@ def test_base64_pickle_coder(self): self.check_coder(coders.Base64PickleCoder(), 'a', 1, 1.5, (1, 2, 3)) def test_utf8_coder(self): - self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0') + self.check_coder(coders.StrUtf8Coder(), u'a', u'ab\u00FF', u'\u0101\0') def test_iterable_coder(self): iterable_coder = coders.IterableCoder(coders.VarIntCoder()) @@ -322,12 +327,12 @@ def test_windowed_value_coder(self): }, coder.as_cloud_object()) # Test binary representation - self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01', + self.assertEqual(b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01', coder.encode(window.GlobalWindows.windowed_value(1))) # Test decoding large timestamp self.assertEqual( - coder.decode('\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'), + coder.decode(b'\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'), windowed_value.create(0, MIN_TIMESTAMP.micros, (GlobalWindow(),))) # Test unnested @@ -348,7 +353,7 @@ def test_windowed_value_coder(self): coders.WindowedValueCoder(coders.FloatCoder()), coders.WindowedValueCoder(coders.StrUtf8Coder()))), (windowed_value.WindowedValue(1.5, 0, ()), - windowed_value.WindowedValue("abc", 10, ('window',)))) + windowed_value.WindowedValue(u"abc", 10, (u'window',)))) def test_proto_coder(self): # For instructions on how these test proto message were generated, @@ -364,7 +369,7 @@ def test_proto_coder(self): proto_coder = coders.ProtoCoder(ma.__class__) self.check_coder(proto_coder, ma) self.check_coder(coders.TupleCoder((proto_coder, coders.BytesCoder())), - (ma, 'a'), (mb, 'b')) + (ma, b'a'), (mb, b'b')) def test_global_window_coder(self): coder = coders.GlobalWindowCoder() @@ -391,16 +396,16 @@ def test_length_prefix_coder(self): }, coder.as_cloud_object()) # Test binary representation - self.assertEqual('\x00', coder.encode('')) - self.assertEqual('\x01a', coder.encode('a')) - self.assertEqual('\x02bc', coder.encode('bc')) - self.assertEqual('\xff\x7f' + 'z' * 16383, coder.encode('z' * 16383)) + self.assertEqual(b'\x00', coder.encode(b'')) + self.assertEqual(b'\x01a', coder.encode(b'a')) + self.assertEqual(b'\x02bc', coder.encode(b'bc')) + self.assertEqual(b'\xff\x7f' + b'z' * 16383, coder.encode(b'z' * 16383)) # Test unnested - self.check_coder(coder, '', 'a', 'bc', 'def') + self.check_coder(coder, b'', b'a', b'bc', b'def') # Test nested self.check_coder(coders.TupleCoder((coder, coder)), - ('', 'a'), - ('bc', 'def')) + (b'', b'a'), + (b'bc', b'def')) def test_nested_observables(self): class FakeObservableIterator(observable.ObservableMixin): diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py b/sdks/python/apache_beam/coders/fast_coders_test.py index a13334a2c26f..8cb825769cfe 100644 --- a/sdks/python/apache_beam/coders/fast_coders_test.py +++ b/sdks/python/apache_beam/coders/fast_coders_test.py @@ -16,6 +16,9 @@ # """Unit tests for compiled implementation of coder impls.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import logging import unittest diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py index fc952cf4e559..e512e60b8da0 100644 --- a/sdks/python/apache_beam/coders/observable.py +++ b/sdks/python/apache_beam/coders/observable.py @@ -20,6 +20,11 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from builtins import object class ObservableMixin(object): diff --git a/sdks/python/apache_beam/coders/observable_test.py b/sdks/python/apache_beam/coders/observable_test.py index 09ca3041c298..a6aea6335218 100644 --- a/sdks/python/apache_beam/coders/observable_test.py +++ b/sdks/python/apache_beam/coders/observable_test.py @@ -16,6 +16,9 @@ # """Tests for the Observable mixin class.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import logging import unittest diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py b/sdks/python/apache_beam/coders/slow_coders_test.py index 97aa39ca094f..b4fe0370a69e 100644 --- a/sdks/python/apache_beam/coders/slow_coders_test.py +++ b/sdks/python/apache_beam/coders/slow_coders_test.py @@ -16,6 +16,9 @@ # """Unit tests for uncompiled implementation of coder impls.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import logging import unittest diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index 1ab55d90f98d..d497e3ff8110 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -19,8 +19,14 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import struct +from builtins import bytes +from builtins import chr +from builtins import object class OutputStream(object): @@ -32,13 +38,13 @@ def __init__(self): self.data = [] def write(self, b, nested=False): - assert isinstance(b, str) + assert isinstance(b, bytes) if nested: self.write_var_int64(len(b)) self.data.append(b) def write_byte(self, val): - self.data.append(chr(val)) + self.data.append(chr(val).encode('latin-1')) def write_var_int64(self, v): if v < 0: diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index ca13b8093795..66c297f6e38e 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -17,6 +17,8 @@ """Unit tests for coders that must be consistent across all Beam SDKs. """ +from __future__ import absolute_import +from __future__ import division from __future__ import print_function import json @@ -24,6 +26,7 @@ import os.path import sys import unittest +from builtins import map import yaml @@ -74,7 +77,7 @@ class StandardCodersTest(unittest.TestCase): lambda x: IntervalWindow( start=Timestamp(micros=(x['end'] - x['span']) * 1000), end=Timestamp(micros=x['end'] * 1000)), - 'urn:beam:coders:stream:0.1': lambda x, parser: map(parser, x), + 'urn:beam:coders:stream:0.1': lambda x, parser: list(map(parser, x)), 'urn:beam:coders:global_window:0.1': lambda x: window.GlobalWindow(), 'urn:beam:coders:windowed_value:0.1': lambda x, value_parser, window_parser: windowed_value.create( diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd index ade9b722c6ea..df15c3086320 100644 --- a/sdks/python/apache_beam/coders/stream.pxd +++ b/sdks/python/apache_beam/coders/stream.pxd @@ -23,7 +23,7 @@ cdef class OutputStream(object): cdef size_t buffer_size cdef size_t pos - cpdef write(self, bytes b, bint nested=*) + cpdef write(self, const unsigned char[:] b, bint nested=*) cpdef write_byte(self, unsigned char val) cpdef write_var_int64(self, libc.stdint.int64_t v) cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v) @@ -39,7 +39,7 @@ cdef class OutputStream(object): cdef class ByteCountingOutputStream(OutputStream): cdef size_t count - cpdef write(self, bytes b, bint nested=*) + cpdef write(self, const unsigned char[:] b, bint nested=*) cpdef write_byte(self, unsigned char val) cpdef write_bigendian_int64(self, libc.stdint.int64_t val) cpdef write_bigendian_uint64(self, libc.stdint.uint64_t val) diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx index 7c9521a86379..414c294c3b9b 100644 --- a/sdks/python/apache_beam/coders/stream.pyx +++ b/sdks/python/apache_beam/coders/stream.pyx @@ -39,14 +39,15 @@ cdef class OutputStream(object): if self.data: libc.stdlib.free(self.data) - cpdef write(self, bytes b, bint nested=False): + cpdef write(self, const unsigned char[:] b, bint nested=False): cdef size_t blen = len(b) if nested: self.write_var_int64(blen) if self.buffer_size < self.pos + blen: self.extend(blen) - libc.string.memcpy(self.data + self.pos, b, blen) - self.pos += blen + if blen > 0: + libc.string.memcpy(self.data + self.pos, &b[0], blen) + self.pos += blen cpdef write_byte(self, unsigned char val): if self.buffer_size < self.pos + 1: @@ -122,7 +123,7 @@ cdef class ByteCountingOutputStream(OutputStream): def __cinit__(self): self.count = 0 - cpdef write(self, bytes b, bint nested=False): + cpdef write(self, const unsigned char[:] b, bint nested=False): cdef size_t blen = len(b) if nested: self.write_var_int64(blen) diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py index 15bc5eb9ba93..674c1730eccb 100644 --- a/sdks/python/apache_beam/coders/stream_test.py +++ b/sdks/python/apache_beam/coders/stream_test.py @@ -16,10 +16,15 @@ # """Tests for the stream implementations.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import logging import math import unittest +from builtins import int +from builtins import range from apache_beam.coders import slow_stream diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 355c6230f923..92c0c161c6f4 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -63,8 +63,14 @@ def MakeXyzs(v): See apache_beam.typehints.decorators module for more details. """ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function -import six +from builtins import bytes +from builtins import int +from builtins import object +from builtins import str from apache_beam.coders import coders from apache_beam.typehints import typehints @@ -84,9 +90,8 @@ def register_standard_coders(self, fallback_coder): """Register coders for all basic and composite types.""" self._register_coder_internal(int, coders.VarIntCoder) self._register_coder_internal(float, coders.FloatCoder) - self._register_coder_internal(str, coders.BytesCoder) self._register_coder_internal(bytes, coders.BytesCoder) - self._register_coder_internal(six.text_type, coders.StrUtf8Coder) + self._register_coder_internal(str, coders.StrUtf8Coder) self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder) # Default fallback coders applied in that order until the first matching # coder found. @@ -105,11 +110,21 @@ def register_coder(self, typehint_type, typehint_coder_class): self._register_coder_internal(typehint_type, typehint_coder_class) def get_coder(self, typehint): - coder = self._coders.get( - typehint.__class__ if isinstance(typehint, typehints.TypeConstraint) - else typehint, None) - if isinstance(typehint, typehints.TypeConstraint) and coder is not None: - return coder.from_type_hint(typehint, self) + if isinstance(typehint, typehints.TypeConstraint): + coder = self._coders.get(typehint.__class__) + if coder is not None: + return coder.from_type_hint(typehint, self) + else: + try: + t = typehint() + coder = self._coders.get( + str if isinstance(t, str) + else bytes if isinstance(t, bytes) + else int if isinstance(t, int) and not isinstance(t, bool) + else typehint, None) + except TypeError: + # typehint cannot be instantiated (without arguments) + coder = self._coders.get(typehint, None) if coder is None: # We use the fallback coder when there is no coder registered for a # typehint. For example a user defined class with no coder specified. diff --git a/sdks/python/apache_beam/coders/typecoders_test.py b/sdks/python/apache_beam/coders/typecoders_test.py index 2b6aa7a51298..fc08eedf1510 100644 --- a/sdks/python/apache_beam/coders/typecoders_test.py +++ b/sdks/python/apache_beam/coders/typecoders_test.py @@ -16,8 +16,14 @@ # """Unit tests for the typecoders module.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import unittest +from builtins import int +from builtins import object +from builtins import str from apache_beam.coders import coders from apache_beam.coders import typecoders @@ -33,14 +39,17 @@ def __init__(self, n): def __eq__(self, other): return self.number == other.number + def __hash__(self): + return self.number + class CustomCoder(coders.Coder): def encode(self, value): - return str(value.number) + return str(value.number).encode('latin-1') def decode(self, encoded): - return CustomClass(int(encoded)) + return CustomClass(encoded.decode('latin-1')) def is_deterministic(self): # This coder is deterministic. Though we don't use need this coder to be diff --git a/sdks/python/generate_pydoc.sh b/sdks/python/generate_pydoc.sh index 54795e2c90c3..29cafb6911a9 100755 --- a/sdks/python/generate_pydoc.sh +++ b/sdks/python/generate_pydoc.sh @@ -119,6 +119,9 @@ ignore_identifiers = [ # Ignore broken built-in type references 'tuple', + # Ignore future.builtin type references + 'future.types.newobject.newobject', + # Ignore private classes 'apache_beam.coders.coders._PickleCoderBase', 'apache_beam.coders.coders.FastCoder', diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index 89c46ce82975..2505c20f2bb5 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -90,25 +90,6 @@ pushd "$MODULE" isort -p apache_beam --line-width 120 --check-only --order-by-type --combine-star --force-single-line-imports --diff ${SKIP_PARAM} popd -FUTURIZE_EXCLUDED=( - "typehints.py" - "pb2" - "trivial_infernce.py" -) -FUTURIZE_GREP_PARAM=$( IFS='|'; echo "${ids[*]}" ) -echo "Checking for files requiring stage 1 refactoring from futurize" -futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored) -futurize_filtered=$(echo "$futurize_results" |grep -v "$FUTURIZE_GREP_PARAM" || echo "") -count=${#futurize_filtered} -if [ "$count" != "0" ]; then - echo "Some of the changes require futurize stage 1 changes." - echo "The files with required changes:" - echo "$futurize_filtered" - echo "You can run futurize apache_beam to see the proposed changes." - exit 1 -fi -echo "No future changes needed" - echo "Checking unittest.main for module ${MODULE}:" TESTS_MISSING_MAIN=$(find ${MODULE} | grep '\.py$' | xargs grep -l '^import unittest$' | xargs grep -L unittest.main) if [ -n "${TESTS_MISSING_MAIN}" ]; then @@ -118,4 +99,4 @@ if [ -n "${TESTS_MISSING_MAIN}" ]; then done echo exit 1 -fi +fi \ No newline at end of file diff --git a/sdks/python/run_pylint_2to3.sh b/sdks/python/run_pylint_2to3.sh new file mode 100644 index 000000000000..63df49d40ece --- /dev/null +++ b/sdks/python/run_pylint_2to3.sh @@ -0,0 +1,90 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This script will run pylint with the --py3k parameter to check for python +# 3 compatibility. This script can run on a list of modules provided as +# command line arguments. +# +# The exit-code of the script indicates success or a failure. + +set -o errexit +set -o pipefail + +DEFAULT_MODULE=apache_beam + +usage(){ echo "Usage: $0 [MODULE|--help] +# The default MODULE is $DEFAULT_MODULE"; } + +MODULES=${DEFAULT_MODULE} +while [[ $# -gt 0 ]] ; do + key="$1" + case ${key} in + --help) usage; exit 1;; + *) + if [ ${MODULES} = ${DEFAULT_MODULE} ] ; then + MODULES=() + fi + MODULES+=("$1") + shift;; + esac +done + +FUTURIZE_EXCLUDED=( + "typehints.py" + "pb2" + "trivial_infernce.py" +) +FUTURIZE_GREP_PARAM=$( IFS='|'; echo "${ids[*]}" ) +echo "Checking for files requiring stage 1 refactoring from futurize" +futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored) +futurize_filtered=$(echo "$futurize_results" |grep -v "$FUTURIZE_GREP_PARAM" \ + || echo "") +count=${#futurize_filtered} +if [ "$count" != "0" ]; then + echo "Some of the changes require futurize stage 1 changes." + echo "The files with required changes:" + echo "$futurize_filtered" + echo "You can run futurize apache_beam to see the proposed changes." + exit 1 +fi +echo "No future changes needed" + +# Following generated files are excluded from lint checks. +EXCLUDED_GENERATED_FILES=( +"apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py" +"apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py" +"apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py" +"apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py" +"apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py" +"apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py" +"apache_beam/coders/proto2_coder_test_messages_pb2.py" +apache_beam/portability/api/*pb2*.py +) + +FILES_TO_IGNORE="" +for file in "${EXCLUDED_GENERATED_FILES[@]}"; do + if test -z "$FILES_TO_IGNORE" + then FILES_TO_IGNORE="$(basename $file)" + else FILES_TO_IGNORE="$FILES_TO_IGNORE, $(basename $file)" + fi +done +echo "Skipping lint for generated files: $FILES_TO_IGNORE" + +echo "Running pylint --py3k for modules $( printf "%s " "${MODULES[@]}" ):" +pylint -j8 $( printf "%s " "${MODULES[@]}" ) \ + --ignore-patterns="$FILES_TO_IGNORE" --py3k \ No newline at end of file diff --git a/sdks/python/setup.py b/sdks/python/setup.py index b7f400e739e2..86d366211875 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -69,7 +69,7 @@ def get_version(): ) -REQUIRED_CYTHON_VERSION = '0.26.1' +REQUIRED_CYTHON_VERSION = '0.28.1' try: _CYTHON_VERSION = get_distribution('cython').version if StrictVersion(_CYTHON_VERSION) < StrictVersion(REQUIRED_CYTHON_VERSION): diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index ff88ac42fa83..aa7e4ca146e9 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -17,7 +17,7 @@ [tox] # new environments will be excluded by default unless explicitly added to envlist. -envlist = py27,py27-{gcp,cython,lint},py3-lint,docs +envlist = py27,py27-{gcp,cython,lint,lint3},py3-lint,docs toxworkdir = {toxinidir}/target/.tox [pycodestyle] @@ -35,7 +35,8 @@ whitelist_externals = time deps = grpcio-tools==1.3.5 - cython: cython==0.26.1 + cython: cython==0.28.1 + future==0.16.0 # These 2 magic command overrides are required for Jenkins builds. # Otherwise we get "OSError: [Errno 2] No such file or directory" errors. @@ -59,6 +60,9 @@ commands = # `platform = linux2|darwin|...` # See https://docs.python.org/2/library/sys.html#sys.platform for platform codes platform = linux2 +deps = + cython==0.28.1 + future==0.16.0 commands = python --version pip --version @@ -89,6 +93,21 @@ commands = pip --version time {toxinidir}/run_pylint.sh +[testenv:py27-lint3] +deps = + pycodestyle==2.3.1 + pylint==1.7.2 + future==0.16.0 + isort==4.2.15 + flake8==3.5.0 +modules = + apache_beam/coders +commands = + python --version + pip --version + time {toxinidir}/run_pylint_2to3.sh {[testenv:py27-lint3]modules} + + [testenv:py3-lint] deps = pycodestyle==2.3.1