From 3b53cfe4aa4313df22796d112a98988ebd9fa042 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Thu, 29 Mar 2018 11:02:29 +0200 Subject: [PATCH 01/19] Add pylint2 --py3k check per updated subpackage --- sdks/python/run_pylint.sh | 79 ++++++++++++++++++++++++++------------- sdks/python/tox.ini | 21 ++++++++++- 2 files changed, 71 insertions(+), 29 deletions(-) diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index 89c46ce82975..27d65c2d2119 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -26,16 +26,26 @@ set -o errexit set -o pipefail -MODULE=apache_beam +DEFAULT_MODULE=apache_beam -usage(){ echo "Usage: $0 [MODULE|--help] # The default MODULE is $MODULE"; } +usage(){ echo "Usage: $0 [MODULE|--help] +# The default MODULE is $DEFAULT_MODULE"; } -if test $# -gt 0; then - case "$@" in +MODULES=${DEFAULT_MODULE} +PY3K=false +while [[ $# -gt 0 ]] ; do + key="$1" + case ${key} in --help) usage; exit 1;; - *) MODULE="$*";; + --py3k) PY3K=true; shift;; + *) + if [ ${MODULES} = ${DEFAULT_MODULE} ] ; then + MODULES=() + fi + MODULES+=("$1") + shift;; esac -fi +done # Following generated files are excluded from lint checks. EXCLUDED_GENERATED_FILES=( @@ -58,15 +68,22 @@ for file in "${EXCLUDED_GENERATED_FILES[@]}"; do done echo "Skipping lint for generated files: $FILES_TO_IGNORE" -echo "Running pylint for module $MODULE:" -pylint -j8 "$MODULE" --ignore-patterns="$FILES_TO_IGNORE" -echo "Running pycodestyle for module $MODULE:" -pycodestyle "$MODULE" --exclude="$FILES_TO_IGNORE" -echo "Running flake8 for module $MODULE:" +echo "Running pylint for modules $( printf "%s " "${MODULES[@]}" ):" +pylint -j8 $( printf "%s " "${MODULES[@]}" ) \ + --ignore-patterns="$FILES_TO_IGNORE" \ + $( [ "$PY3K" = true ] && printf %s '--py3k' ) \ + || (echo "Please execute futurize stage 2 to remain python 3 compatible." + echo + exit 1) + +echo "Running pycodestyle for modules$( printf "%s " "${MODULES[@]}" ):" +pycodestyle $( printf "%s " "${MODULES[@]}" ) --exclude="$FILES_TO_IGNORE" +echo "Running flake8 for modules $( printf "%s " "${MODULES[@]}" ):" # TODO(BEAM-3959): Add F821 (undefined names) as soon as that test passes -flake8 $MODULE --count --select=E9,F822,F823 --show-source --statistics +flake8 $( printf "%s " "${MODULES[@]}" ) --count --select=E9,F822,F823 \ + --show-source --statistics -echo "Running isort for module $MODULE:" +echo "Running isort for modules $( printf "%s " "${MODULES[@]}" ):" # Skip files where isort is behaving weirdly ISORT_EXCLUDED=( "apiclient.py" @@ -86,9 +103,13 @@ done for file in "${EXCLUDED_GENERATED_FILES[@]}"; do SKIP_PARAM="$SKIP_PARAM --skip $(basename $file)" done -pushd "$MODULE" -isort -p apache_beam --line-width 120 --check-only --order-by-type --combine-star --force-single-line-imports --diff ${SKIP_PARAM} -popd + +for module in "$MODULES"; do + pushd "$module" + isort -p apache_beam --line-width 120 --check-only --order-by-type \ + --combine-star --force-single-line-imports --diff ${SKIP_PARAM} + popd +done FUTURIZE_EXCLUDED=( "typehints.py" @@ -98,7 +119,8 @@ FUTURIZE_EXCLUDED=( FUTURIZE_GREP_PARAM=$( IFS='|'; echo "${ids[*]}" ) echo "Checking for files requiring stage 1 refactoring from futurize" futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored) -futurize_filtered=$(echo "$futurize_results" |grep -v "$FUTURIZE_GREP_PARAM" || echo "") +futurize_filtered=$(echo "$futurize_results" |grep -v "$FUTURIZE_GREP_PARAM" \ + || echo "") count=${#futurize_filtered} if [ "$count" != "0" ]; then echo "Some of the changes require futurize stage 1 changes." @@ -109,13 +131,16 @@ if [ "$count" != "0" ]; then fi echo "No future changes needed" -echo "Checking unittest.main for module ${MODULE}:" -TESTS_MISSING_MAIN=$(find ${MODULE} | grep '\.py$' | xargs grep -l '^import unittest$' | xargs grep -L unittest.main) -if [ -n "${TESTS_MISSING_MAIN}" ]; then - echo -e "\nThe following files are missing a call to unittest.main():" - for FILE in ${TESTS_MISSING_MAIN}; do - echo " ${FILE}" - done - echo - exit 1 -fi +echo "Checking unittest.main for modules $( printf "%s " "${MODULES[@]}" ):" +for module in "$MODULES"; do + TESTS_MISSING_MAIN=$(find ${module} | grep '\.py$' | xargs grep -l \ + '^import unittest$' | xargs grep -L unittest.main) + if [ -n "${TESTS_MISSING_MAIN}" ]; then + echo -e "\nThe following files are missing a call to unittest.main():" + for FILE in ${TESTS_MISSING_MAIN}; do + echo " ${FILE}" + done + echo + exit 1 + fi +done \ No newline at end of file diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index ff88ac42fa83..677076da59da 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -17,7 +17,7 @@ [tox] # new environments will be excluded by default unless explicitly added to envlist. -envlist = py27,py27-{gcp,cython,lint},py3-lint,docs +envlist = py27,py27-{gcp,cython,lint,lint3},py3-lint,docs toxworkdir = {toxinidir}/target/.tox [pycodestyle] @@ -35,7 +35,7 @@ whitelist_externals = time deps = grpcio-tools==1.3.5 - cython: cython==0.26.1 + cython: cython==0.28.1 # These 2 magic command overrides are required for Jenkins builds. # Otherwise we get "OSError: [Errno 2] No such file or directory" errors. @@ -59,6 +59,8 @@ commands = # `platform = linux2|darwin|...` # See https://docs.python.org/2/library/sys.html#sys.platform for platform codes platform = linux2 +deps = + cython==0.28.1 commands = python --version pip --version @@ -89,6 +91,21 @@ commands = pip --version time {toxinidir}/run_pylint.sh +[testenv:py27-lint3] +deps = + pycodestyle==2.3.1 + pylint==1.7.2 + future==0.16.0 + isort==4.2.15 + flake8==3.5.0 +modules = + apache_beam/coders +commands = + python --version + pip --version + time {toxinidir}/run_pylint.sh --py3k {[testenv:py27-lint3]modules} + + [testenv:py3-lint] deps = pycodestyle==2.3.1 From 5aca33a95534ef1a99fa507442dc79603317cf52 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Fri, 30 Mar 2018 19:31:22 +0200 Subject: [PATCH 02/19] Futurize and fix coder_impl.py --- sdks/python/apache_beam/coders/coder_impl.py | 62 ++++++++++---------- sdks/python/run_pylint.sh | 5 +- sdks/python/setup.py | 2 +- sdks/python/tox.ini | 2 + 4 files changed, 35 insertions(+), 36 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index cc7ed87c3ad1..0c7d84b5be2b 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -27,10 +27,14 @@ For internal use only; no backwards-compatibility guarantees. """ from __future__ import absolute_import +from __future__ import division +from __future__ import print_function -from types import NoneType - -import six +from builtins import chr +from builtins import int +from builtins import object +from builtins import range +from builtins import str from apache_beam.coders import observable from apache_beam.utils import windowed_value @@ -54,11 +58,6 @@ from .slow_stream import get_varint_size # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports -try: - long # Python 2 -except NameError: - long = int # Python 3 - class CoderImpl(object): """For internal use only; no backwards-compatibility guarantees.""" @@ -199,7 +198,7 @@ def __init__(self, coder, step_label): self._step_label = step_label def _check_safe(self, value): - if isinstance(value, (str, six.text_type, long, int, float)): + if isinstance(value, (str, bytes, int, float)): pass elif value is None: pass @@ -278,38 +277,38 @@ def get_estimated_size_and_observables(self, value, nested=False): return out.get_count(), [] def encode_to_stream(self, value, stream, nested): - t = type(value) - if t is NoneType: + if value is None: stream.write_byte(NONE_TYPE) - elif t is int: + elif isinstance(value, bool): + stream.write_byte(BOOL_TYPE) + stream.write_byte(value) + elif isinstance(value, int): stream.write_byte(INT_TYPE) stream.write_var_int64(value) - elif t is float: + elif isinstance(value, float): stream.write_byte(FLOAT_TYPE) stream.write_bigendian_double(value) - elif t is str: + elif isinstance(value, bytes): stream.write_byte(STR_TYPE) stream.write(value, nested) - elif t is six.text_type: + elif isinstance(value, str): unicode_value = value # for typing stream.write_byte(UNICODE_TYPE) stream.write(unicode_value.encode('utf-8'), nested) - elif t is list or t is tuple or t is set: + elif isinstance(value, (list, tuple, set)): stream.write_byte( - LIST_TYPE if t is list else TUPLE_TYPE if t is tuple else SET_TYPE) + LIST_TYPE if isinstance(value, list) else TUPLE_TYPE if + isinstance(value, tuple) else SET_TYPE) stream.write_var_int64(len(value)) for e in value: self.encode_to_stream(e, stream, True) - elif t is dict: + elif isinstance(value, dict): dict_value = value # for typing stream.write_byte(DICT_TYPE) stream.write_var_int64(len(dict_value)) - for k, v in dict_value.iteritems(): + for k, v in dict_value.items(): self.encode_to_stream(k, stream, True) self.encode_to_stream(v, stream, True) - elif t is bool: - stream.write_byte(BOOL_TYPE) - stream.write_byte(value) else: stream.write_byte(UNKNOWN_TYPE) self.fallback_coder_impl.encode_to_stream(value, stream, nested) @@ -318,6 +317,8 @@ def decode_from_stream(self, stream, nested): t = stream.read_byte() if t == NONE_TYPE: return None + elif t == BOOL_TYPE: + return not not stream.read_byte() elif t == INT_TYPE: return stream.read_var_int64() elif t == FLOAT_TYPE: @@ -341,8 +342,6 @@ def decode_from_stream(self, stream, nested): k = self.decode_from_stream(stream, True) v[k] = self.decode_from_stream(stream, True) return v - elif t == BOOL_TYPE: - return not not stream.read_byte() return self.fallback_coder_impl.decode_from_stream(stream, nested) @@ -394,8 +393,9 @@ def _from_normal_time(self, value): def encode_to_stream(self, value, out, nested): span_micros = value.end.micros - value.start.micros - out.write_bigendian_uint64(self._from_normal_time(value.end.micros / 1000)) - out.write_var_int64(span_micros / 1000) + out.write_bigendian_uint64( + self._from_normal_time(value.end.micros // 1000)) + out.write_var_int64(span_micros // 1000) def decode_from_stream(self, in_, nested): end_millis = self._to_normal_time(in_.read_bigendian_uint64()) @@ -409,7 +409,7 @@ def estimate_size(self, value, nested=False): # An IntervalWindow is context-insensitive, with a timestamp (8 bytes) # and a varint timespam. span = value.end.micros - value.start.micros - return 8 + get_varint_size(span / 1000) + return 8 + get_varint_size(span // 1000) class TimestampCoderImpl(StreamCoderImpl): @@ -427,7 +427,7 @@ def estimate_size(self, unused_value, nested=False): return 8 -small_ints = [chr(_) for _ in range(128)] +small_ints = [chr(_).encode('latin-1') for _ in range(128)] class VarIntCoderImpl(StreamCoderImpl): @@ -783,7 +783,7 @@ def encode_to_stream(self, value, out, nested): # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on # precision of timestamps. self._from_normal_time( - restore_sign * (abs(wv.timestamp_micros) / 1000))) + restore_sign * (abs(wv.timestamp_micros) // 1000))) self._windows_coder.encode_to_stream(wv.windows, out, True) # Default PaneInfo encoded byte representing NO_FIRING. self._pane_info_coder.encode_to_stream(wv.pane_info, out, True) @@ -797,9 +797,9 @@ def decode_from_stream(self, in_stream, nested): # were indeed MIN/MAX timestamps. # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on # precision of timestamps. - if timestamp == -(abs(MIN_TIMESTAMP.micros) / 1000): + if timestamp == -(abs(MIN_TIMESTAMP.micros) // 1000): timestamp = MIN_TIMESTAMP.micros - elif timestamp == (MAX_TIMESTAMP.micros / 1000): + elif timestamp == (MAX_TIMESTAMP.micros // 1000): timestamp = MAX_TIMESTAMP.micros else: timestamp *= 1000 diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index 27d65c2d2119..ccf37db8bc74 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -71,10 +71,7 @@ echo "Skipping lint for generated files: $FILES_TO_IGNORE" echo "Running pylint for modules $( printf "%s " "${MODULES[@]}" ):" pylint -j8 $( printf "%s " "${MODULES[@]}" ) \ --ignore-patterns="$FILES_TO_IGNORE" \ - $( [ "$PY3K" = true ] && printf %s '--py3k' ) \ - || (echo "Please execute futurize stage 2 to remain python 3 compatible." - echo - exit 1) + $( [ "$PY3K" = true ] && printf %s '--py3k' ) echo "Running pycodestyle for modules$( printf "%s " "${MODULES[@]}" ):" pycodestyle $( printf "%s " "${MODULES[@]}" ) --exclude="$FILES_TO_IGNORE" diff --git a/sdks/python/setup.py b/sdks/python/setup.py index b7f400e739e2..86d366211875 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -69,7 +69,7 @@ def get_version(): ) -REQUIRED_CYTHON_VERSION = '0.26.1' +REQUIRED_CYTHON_VERSION = '0.28.1' try: _CYTHON_VERSION = get_distribution('cython').version if StrictVersion(_CYTHON_VERSION) < StrictVersion(REQUIRED_CYTHON_VERSION): diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 677076da59da..314f86aa2a6f 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -36,6 +36,7 @@ whitelist_externals = deps = grpcio-tools==1.3.5 cython: cython==0.28.1 + future==0.16.0 # These 2 magic command overrides are required for Jenkins builds. # Otherwise we get "OSError: [Errno 2] No such file or directory" errors. @@ -61,6 +62,7 @@ commands = platform = linux2 deps = cython==0.28.1 + future==0.16.0 commands = python --version pip --version From 267aa26f2fcca8b48aa4d795aa620235689846df Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Fri, 30 Mar 2018 23:36:41 +0200 Subject: [PATCH 03/19] Futurize and fix coders.py --- sdks/python/apache_beam/coders/coders.py | 18 ++++++++++++------ sdks/python/generate_pydoc.sh | 3 +++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index ecbdd538d38b..f9ea076f97ce 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -20,9 +20,12 @@ Only those coders listed in __all__ are part of the public API of this module. """ from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import base64 -import cPickle as pickle +from builtins import object +from builtins import str import google.protobuf from google.protobuf import wrappers_pb2 @@ -33,6 +36,11 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.utils import proto_utils +try: + import cPickle as pickle +except ImportError: + import pickle + # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: from .stream import get_varint_size @@ -216,6 +224,9 @@ def __eq__(self, other): and self._dict_without_impl() == other._dict_without_impl()) # pylint: enable=protected-access + def __hash__(self): + return hash(type(self)) + _known_urns = {} @classmethod @@ -309,11 +320,6 @@ class ToStringCoder(Coder): """A default string coder used if no sink coder is specified.""" def encode(self, value): - try: # Python 2 - if isinstance(value, unicode): - return value.encode('utf-8') - except NameError: # Python 3 - pass return str(value) def decode(self, _): diff --git a/sdks/python/generate_pydoc.sh b/sdks/python/generate_pydoc.sh index 54795e2c90c3..29cafb6911a9 100755 --- a/sdks/python/generate_pydoc.sh +++ b/sdks/python/generate_pydoc.sh @@ -119,6 +119,9 @@ ignore_identifiers = [ # Ignore broken built-in type references 'tuple', + # Ignore future.builtin type references + 'future.types.newobject.newobject', + # Ignore private classes 'apache_beam.coders.coders._PickleCoderBase', 'apache_beam.coders.coders.FastCoder', From 74e2e53f99bb43fc4e8be79c4194d17106dd087a Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Sun, 1 Apr 2018 01:14:10 +0200 Subject: [PATCH 04/19] Add future imports to __init__ so pylint doesn't complain --- sdks/python/apache_beam/coders/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/coders/__init__.py b/sdks/python/apache_beam/coders/__init__.py index acca89f70f48..4a7e509ed75e 100644 --- a/sdks/python/apache_beam/coders/__init__.py +++ b/sdks/python/apache_beam/coders/__init__.py @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function from apache_beam.coders.coders import * from apache_beam.coders.typecoders import registry From 5825249b9908df62881155f23e6ad154f2678d62 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Sun, 1 Apr 2018 02:00:43 +0200 Subject: [PATCH 05/19] futurize and fix all non-test modules in package --- sdks/python/apache_beam/coders/observable.py | 5 +++++ sdks/python/apache_beam/coders/slow_stream.py | 10 ++++++++-- sdks/python/apache_beam/coders/typecoders.py | 9 ++++++--- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py index fc952cf4e559..e512e60b8da0 100644 --- a/sdks/python/apache_beam/coders/observable.py +++ b/sdks/python/apache_beam/coders/observable.py @@ -20,6 +20,11 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from builtins import object class ObservableMixin(object): diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index 1ab55d90f98d..d497e3ff8110 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -19,8 +19,14 @@ For internal use only; no backwards-compatibility guarantees. """ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import struct +from builtins import bytes +from builtins import chr +from builtins import object class OutputStream(object): @@ -32,13 +38,13 @@ def __init__(self): self.data = [] def write(self, b, nested=False): - assert isinstance(b, str) + assert isinstance(b, bytes) if nested: self.write_var_int64(len(b)) self.data.append(b) def write_byte(self, val): - self.data.append(chr(val)) + self.data.append(chr(val).encode('latin-1')) def write_var_int64(self, v): if v < 0: diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 355c6230f923..c8eafeaf79c3 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -63,8 +63,12 @@ def MakeXyzs(v): See apache_beam.typehints.decorators module for more details. """ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function -import six +from builtins import object +from builtins import str from apache_beam.coders import coders from apache_beam.typehints import typehints @@ -84,9 +88,8 @@ def register_standard_coders(self, fallback_coder): """Register coders for all basic and composite types.""" self._register_coder_internal(int, coders.VarIntCoder) self._register_coder_internal(float, coders.FloatCoder) - self._register_coder_internal(str, coders.BytesCoder) self._register_coder_internal(bytes, coders.BytesCoder) - self._register_coder_internal(six.text_type, coders.StrUtf8Coder) + self._register_coder_internal(str, coders.StrUtf8Coder) self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder) # Default fallback coders applied in that order until the first matching # coder found. From 18f9645f0a69709b8d71398e09f6fe8b212d33a1 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Sun, 1 Apr 2018 02:39:51 +0200 Subject: [PATCH 06/19] Futurize and fix coders_test.py --- sdks/python/apache_beam/coders/coders_test.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index 705de8920d52..1a32b2df31c9 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -14,11 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import base64 import logging import unittest +from builtins import object from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message from apache_beam.coders import coders @@ -99,6 +102,9 @@ def __eq__(self, other): return True return False + def __hash__(self): + return hash(type(self)) + class FallbackCoderTest(unittest.TestCase): From 89d854b7303aac02d2f0f9f775e5c6033b9e6b5e Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Sun, 1 Apr 2018 02:44:04 +0200 Subject: [PATCH 07/19] Add future imports to multiple modules --- sdks/python/apache_beam/coders/fast_coders_test.py | 3 +++ sdks/python/apache_beam/coders/observable_test.py | 3 +++ sdks/python/apache_beam/coders/slow_coders_test.py | 3 +++ 3 files changed, 9 insertions(+) diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py b/sdks/python/apache_beam/coders/fast_coders_test.py index a13334a2c26f..8cb825769cfe 100644 --- a/sdks/python/apache_beam/coders/fast_coders_test.py +++ b/sdks/python/apache_beam/coders/fast_coders_test.py @@ -16,6 +16,9 @@ # """Unit tests for compiled implementation of coder impls.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import logging import unittest diff --git a/sdks/python/apache_beam/coders/observable_test.py b/sdks/python/apache_beam/coders/observable_test.py index 09ca3041c298..a6aea6335218 100644 --- a/sdks/python/apache_beam/coders/observable_test.py +++ b/sdks/python/apache_beam/coders/observable_test.py @@ -16,6 +16,9 @@ # """Tests for the Observable mixin class.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import logging import unittest diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py b/sdks/python/apache_beam/coders/slow_coders_test.py index 97aa39ca094f..b4fe0370a69e 100644 --- a/sdks/python/apache_beam/coders/slow_coders_test.py +++ b/sdks/python/apache_beam/coders/slow_coders_test.py @@ -16,6 +16,9 @@ # """Unit tests for uncompiled implementation of coder impls.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import logging import unittest From 64cb8e3bec90132ce60d91bd522e392cec035722 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Sun, 1 Apr 2018 08:28:30 +0200 Subject: [PATCH 08/19] Futurize and fix standard_coders_test.py --- sdks/python/apache_beam/coders/standard_coders_test.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index ca13b8093795..66c297f6e38e 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -17,6 +17,8 @@ """Unit tests for coders that must be consistent across all Beam SDKs. """ +from __future__ import absolute_import +from __future__ import division from __future__ import print_function import json @@ -24,6 +26,7 @@ import os.path import sys import unittest +from builtins import map import yaml @@ -74,7 +77,7 @@ class StandardCodersTest(unittest.TestCase): lambda x: IntervalWindow( start=Timestamp(micros=(x['end'] - x['span']) * 1000), end=Timestamp(micros=x['end'] * 1000)), - 'urn:beam:coders:stream:0.1': lambda x, parser: map(parser, x), + 'urn:beam:coders:stream:0.1': lambda x, parser: list(map(parser, x)), 'urn:beam:coders:global_window:0.1': lambda x: window.GlobalWindow(), 'urn:beam:coders:windowed_value:0.1': lambda x, value_parser, window_parser: windowed_value.create( From 4d967413dcc7be4d3cc2a23f70b4a8c9c0c9bd00 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Sun, 1 Apr 2018 09:31:13 +0200 Subject: [PATCH 09/19] Futurize and fix stream_test.py --- sdks/python/apache_beam/coders/stream_test.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py index 15bc5eb9ba93..674c1730eccb 100644 --- a/sdks/python/apache_beam/coders/stream_test.py +++ b/sdks/python/apache_beam/coders/stream_test.py @@ -16,10 +16,15 @@ # """Tests for the stream implementations.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import logging import math import unittest +from builtins import int +from builtins import range from apache_beam.coders import slow_stream From 4c5920781ea377efa2105a0a1b83baf80f76d942 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Sun, 1 Apr 2018 22:14:39 +0200 Subject: [PATCH 10/19] Futurize and fix proto2_coder_test_messages_pb2.py --- .../coders/proto2_coder_test_messages_pb2.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py b/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py index 433d33f98841..6189c1feedb7 100644 --- a/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py +++ b/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py @@ -17,16 +17,15 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: sdks/java/core/src/main/proto/proto2_coder_test_messages.proto - -import sys +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database from google.protobuf import descriptor_pb2 - -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -36,7 +35,7 @@ name='apache_beam/coders/proto2_coder_test_messages.proto', package='proto2_coder_test_messages', syntax='proto2', - serialized_pb=_b('\n3apache_beam/coders/proto2_coder_test_messages.proto\x12\x1aproto2_coder_test_messages\"P\n\x08MessageA\x12\x0e\n\x06\x66ield1\x18\x01 \x01(\t\x12\x34\n\x06\x66ield2\x18\x02 \x03(\x0b\x32$.proto2_coder_test_messages.MessageB\"\x1a\n\x08MessageB\x12\x0e\n\x06\x66ield1\x18\x01 \x01(\x08\"\x10\n\x08MessageC*\x04\x08\x64\x10j\"\xad\x01\n\x0eMessageWithMap\x12\x46\n\x06\x66ield1\x18\x01 \x03(\x0b\x32\x36.proto2_coder_test_messages.MessageWithMap.Field1Entry\x1aS\n\x0b\x46ield1Entry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.proto2_coder_test_messages.MessageA:\x02\x38\x01\"V\n\x18ReferencesMessageWithMap\x12:\n\x06\x66ield1\x18\x01 \x03(\x0b\x32*.proto2_coder_test_messages.MessageWithMap:Z\n\x06\x66ield1\x12$.proto2_coder_test_messages.MessageC\x18\x65 \x01(\x0b\x32$.proto2_coder_test_messages.MessageA:Z\n\x06\x66ield2\x12$.proto2_coder_test_messages.MessageC\x18\x66 \x01(\x0b\x32$.proto2_coder_test_messages.MessageBB\x1c\n\x1aorg.apache.beam.sdk.coders') + serialized_pb=b'\n3apache_beam/coders/proto2_coder_test_messages.proto\x12\x1aproto2_coder_test_messages\"P\n\x08MessageA\x12\x0e\n\x06\x66ield1\x18\x01 \x01(\t\x12\x34\n\x06\x66ield2\x18\x02 \x03(\x0b\x32$.proto2_coder_test_messages.MessageB\"\x1a\n\x08MessageB\x12\x0e\n\x06\x66ield1\x18\x01 \x01(\x08\"\x10\n\x08MessageC*\x04\x08\x64\x10j\"\xad\x01\n\x0eMessageWithMap\x12\x46\n\x06\x66ield1\x18\x01 \x03(\x0b\x32\x36.proto2_coder_test_messages.MessageWithMap.Field1Entry\x1aS\n\x0b\x46ield1Entry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.proto2_coder_test_messages.MessageA:\x02\x38\x01\"V\n\x18ReferencesMessageWithMap\x12:\n\x06\x66ield1\x18\x01 \x03(\x0b\x32*.proto2_coder_test_messages.MessageWithMap:Z\n\x06\x66ield1\x12$.proto2_coder_test_messages.MessageC\x18\x65 \x01(\x0b\x32$.proto2_coder_test_messages.MessageA:Z\n\x06\x66ield2\x12$.proto2_coder_test_messages.MessageC\x18\x66 \x01(\x0b\x32$.proto2_coder_test_messages.MessageBB\x1c\n\x1aorg.apache.beam.sdk.coders' ) _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -69,7 +68,7 @@ _descriptor.FieldDescriptor( name='field1', full_name='proto2_coder_test_messages.MessageA.field1', index=0, number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), + has_default_value=False, default_value=u"", message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), @@ -162,7 +161,7 @@ _descriptor.FieldDescriptor( name='key', full_name='proto2_coder_test_messages.MessageWithMap.Field1Entry.key', index=0, number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), + has_default_value=False, default_value=u"", message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), @@ -179,7 +178,8 @@ nested_types=[], enum_types=[ ], - options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')), + options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), + b'8\001'), is_extendable=False, syntax='proto2', extension_ranges=[], @@ -312,7 +312,9 @@ MessageC.RegisterExtension(field2) DESCRIPTOR.has_options = True -DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\032org.apache.beam.sdk.coders')) +DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), + b'\n\032org.apache.beam.sdk.coders') _MESSAGEWITHMAP_FIELD1ENTRY.has_options = True -_MESSAGEWITHMAP_FIELD1ENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) +_MESSAGEWITHMAP_FIELD1ENTRY._options = _descriptor._ParseOptions( + descriptor_pb2.MessageOptions(), b'8\001') # @@protoc_insertion_point(module_scope) From 6ffaf75e695a59904dceecd56a1febc205d6f4a4 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Sun, 1 Apr 2018 23:18:02 +0200 Subject: [PATCH 11/19] Futurize and fix typecoders_test.py --- sdks/python/apache_beam/coders/typecoders_test.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/coders/typecoders_test.py b/sdks/python/apache_beam/coders/typecoders_test.py index 2b6aa7a51298..3e01f896b276 100644 --- a/sdks/python/apache_beam/coders/typecoders_test.py +++ b/sdks/python/apache_beam/coders/typecoders_test.py @@ -16,8 +16,14 @@ # """Unit tests for the typecoders module.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import unittest +from builtins import int +from builtins import object +from builtins import str from apache_beam.coders import coders from apache_beam.coders import typecoders @@ -33,14 +39,18 @@ def __init__(self, n): def __eq__(self, other): return self.number == other.number + def __hash__(self): + return self.number + class CustomCoder(coders.Coder): def encode(self, value): - return str(value.number) + x = value.number + return int(x).to_bytes((x.bit_length() + 7) // 8, 'big', signed=True) def decode(self, encoded): - return CustomClass(int(encoded)) + return CustomClass(int.from_bytes(encoded, 'big', signed=True) - 1) def is_deterministic(self): # This coder is deterministic. Though we don't use need this coder to be From 32ffa526d633d864f1f6ac0f997760bd44e6d161 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Mon, 2 Apr 2018 16:25:52 +0200 Subject: [PATCH 12/19] Futurize and fix coders_test_common.py Also remove bytes type from stream.pyx due to imcompatibility with future.builtins.bytes --- .../apache_beam/coders/coders_test_common.py | 53 ++++++++++--------- sdks/python/apache_beam/coders/stream.pxd | 4 +- sdks/python/apache_beam/coders/stream.pyx | 4 +- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 0ea7da2b6ad5..e24eddadc8d6 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -17,10 +17,14 @@ """Tests common to all coder implementations.""" from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import logging import math import unittest +from builtins import int +from builtins import range import dill @@ -40,10 +44,11 @@ class CustomCoder(coders.Coder): def encode(self, x): - return str(x+1) + x = x + 1 + return int(x).to_bytes((x.bit_length() + 7) // 8, 'big', signed=True) def decode(self, encoded): - return int(encoded) - 1 + return int.from_bytes(encoded, 'big', signed=True) - 1 class CodersTest(unittest.TestCase): @@ -103,7 +108,7 @@ def test_custom_coder(self): self.check_coder(CustomCoder(), 1, -10, 5) self.check_coder(coders.TupleCoder((CustomCoder(), coders.BytesCoder())), - (1, 'a'), (-10, 'b'), (5, 'c')) + (1, b'a'), (-10, b'b'), (5, b'c')) def test_pickle_coder(self): self.check_coder(coders.PickleCoder(), 'a', 1, 1.5, (1, 2, 3)) @@ -139,7 +144,7 @@ def test_fast_primitives_coder(self): self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,)) def test_bytes_coder(self): - self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000) + self.check_coder(coders.BytesCoder(), b'a', b'\0', b'z' * 1000) def test_varint_coder(self): # Small ints. @@ -190,7 +195,7 @@ def test_timestamp_coder(self): timestamp.Timestamp(micros=1234567890123456789)) self.check_coder( coders.TupleCoder((coders.TimestampCoder(), coders.BytesCoder())), - (timestamp.Timestamp.of(27), 'abc')) + (timestamp.Timestamp.of(27), b'abc')) def test_tuple_coder(self): kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())) @@ -206,22 +211,22 @@ def test_tuple_coder(self): kv_coder.as_cloud_object()) # Test binary representation self.assertEqual( - '\x04abc', - kv_coder.encode((4, 'abc'))) + b'\x04abc', + kv_coder.encode((4, b'abc'))) # Test unnested self.check_coder( kv_coder, - (1, 'a'), - (-2, 'a' * 100), - (300, 'abc\0' * 5)) + (1, b'a'), + (-2, b'a' * 100), + (300, b'abc\0' * 5)) # Test nested self.check_coder( coders.TupleCoder( (coders.TupleCoder((coders.PickleCoder(), coders.VarIntCoder())), coders.StrUtf8Coder())), - ((1, 2), 'a'), + ((1, 2), u'a'), ((-2, 5), u'a\u0101' * 100), - ((300, 1), 'abc\0' * 5)) + ((300, 1), u'abc\0' * 5)) def test_tuple_sequence_coder(self): int_tuple_coder = coders.TupleSequenceCoder(coders.VarIntCoder()) @@ -234,7 +239,7 @@ def test_base64_pickle_coder(self): self.check_coder(coders.Base64PickleCoder(), 'a', 1, 1.5, (1, 2, 3)) def test_utf8_coder(self): - self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0') + self.check_coder(coders.StrUtf8Coder(), u'a', u'ab\u00FF', u'\u0101\0') def test_iterable_coder(self): iterable_coder = coders.IterableCoder(coders.VarIntCoder()) @@ -322,12 +327,12 @@ def test_windowed_value_coder(self): }, coder.as_cloud_object()) # Test binary representation - self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01', + self.assertEqual(b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01', coder.encode(window.GlobalWindows.windowed_value(1))) # Test decoding large timestamp self.assertEqual( - coder.decode('\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'), + coder.decode(b'\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'), windowed_value.create(0, MIN_TIMESTAMP.micros, (GlobalWindow(),))) # Test unnested @@ -348,7 +353,7 @@ def test_windowed_value_coder(self): coders.WindowedValueCoder(coders.FloatCoder()), coders.WindowedValueCoder(coders.StrUtf8Coder()))), (windowed_value.WindowedValue(1.5, 0, ()), - windowed_value.WindowedValue("abc", 10, ('window',)))) + windowed_value.WindowedValue(u"abc", 10, (u'window',)))) def test_proto_coder(self): # For instructions on how these test proto message were generated, @@ -364,7 +369,7 @@ def test_proto_coder(self): proto_coder = coders.ProtoCoder(ma.__class__) self.check_coder(proto_coder, ma) self.check_coder(coders.TupleCoder((proto_coder, coders.BytesCoder())), - (ma, 'a'), (mb, 'b')) + (ma, b'a'), (mb, b'b')) def test_global_window_coder(self): coder = coders.GlobalWindowCoder() @@ -391,16 +396,16 @@ def test_length_prefix_coder(self): }, coder.as_cloud_object()) # Test binary representation - self.assertEqual('\x00', coder.encode('')) - self.assertEqual('\x01a', coder.encode('a')) - self.assertEqual('\x02bc', coder.encode('bc')) - self.assertEqual('\xff\x7f' + 'z' * 16383, coder.encode('z' * 16383)) + self.assertEqual(b'\x00', coder.encode(b'')) + self.assertEqual(b'\x01a', coder.encode(b'a')) + self.assertEqual(b'\x02bc', coder.encode(b'bc')) + self.assertEqual(b'\xff\x7f' + b'z' * 16383, coder.encode(b'z' * 16383)) # Test unnested - self.check_coder(coder, '', 'a', 'bc', 'def') + self.check_coder(coder, b'', b'a', b'bc', b'def') # Test nested self.check_coder(coders.TupleCoder((coder, coder)), - ('', 'a'), - ('bc', 'def')) + (b'', b'a'), + (b'bc', b'def')) def test_nested_observables(self): class FakeObservableIterator(observable.ObservableMixin): diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd index ade9b722c6ea..6aed98676b6b 100644 --- a/sdks/python/apache_beam/coders/stream.pxd +++ b/sdks/python/apache_beam/coders/stream.pxd @@ -23,7 +23,7 @@ cdef class OutputStream(object): cdef size_t buffer_size cdef size_t pos - cpdef write(self, bytes b, bint nested=*) + cpdef write(self, b, bint nested=*) cpdef write_byte(self, unsigned char val) cpdef write_var_int64(self, libc.stdint.int64_t v) cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v) @@ -39,7 +39,7 @@ cdef class OutputStream(object): cdef class ByteCountingOutputStream(OutputStream): cdef size_t count - cpdef write(self, bytes b, bint nested=*) + cpdef write(self, b, bint nested=*) cpdef write_byte(self, unsigned char val) cpdef write_bigendian_int64(self, libc.stdint.int64_t val) cpdef write_bigendian_uint64(self, libc.stdint.uint64_t val) diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx index 7c9521a86379..58f2d03cad60 100644 --- a/sdks/python/apache_beam/coders/stream.pyx +++ b/sdks/python/apache_beam/coders/stream.pyx @@ -39,7 +39,7 @@ cdef class OutputStream(object): if self.data: libc.stdlib.free(self.data) - cpdef write(self, bytes b, bint nested=False): + cpdef write(self, b, bint nested=False): cdef size_t blen = len(b) if nested: self.write_var_int64(blen) @@ -122,7 +122,7 @@ cdef class ByteCountingOutputStream(OutputStream): def __cinit__(self): self.count = 0 - cpdef write(self, bytes b, bint nested=False): + cpdef write(self, b, bint nested=False): cdef size_t blen = len(b) if nested: self.write_var_int64(blen) From f48f3f9dd7f0fedd72fa4747959098e266c07807 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Mon, 2 Apr 2018 23:19:08 +0200 Subject: [PATCH 13/19] Fix custom coders --- sdks/python/apache_beam/coders/coders_test_common.py | 6 +++--- sdks/python/apache_beam/coders/typecoders_test.py | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index e24eddadc8d6..7b709d3d6451 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -25,6 +25,7 @@ import unittest from builtins import int from builtins import range +from builtins import str import dill @@ -44,11 +45,10 @@ class CustomCoder(coders.Coder): def encode(self, x): - x = x + 1 - return int(x).to_bytes((x.bit_length() + 7) // 8, 'big', signed=True) + return str(x + 1).encode('latin-1') def decode(self, encoded): - return int.from_bytes(encoded, 'big', signed=True) - 1 + return int(encoded.decode('latin-1')) - 1 class CodersTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/coders/typecoders_test.py b/sdks/python/apache_beam/coders/typecoders_test.py index 3e01f896b276..fc08eedf1510 100644 --- a/sdks/python/apache_beam/coders/typecoders_test.py +++ b/sdks/python/apache_beam/coders/typecoders_test.py @@ -46,11 +46,10 @@ def __hash__(self): class CustomCoder(coders.Coder): def encode(self, value): - x = value.number - return int(x).to_bytes((x.bit_length() + 7) // 8, 'big', signed=True) + return str(value.number).encode('latin-1') def decode(self, encoded): - return CustomClass(int.from_bytes(encoded, 'big', signed=True) - 1) + return CustomClass(encoded.decode('latin-1')) def is_deterministic(self): # This coder is deterministic. Though we don't use need this coder to be From 835fded728bb6b6f3d59ef8966b69e05cb4c440f Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Mon, 2 Apr 2018 23:19:39 +0200 Subject: [PATCH 14/19] Add py2/3 cpickle -> pickle comment --- sdks/python/apache_beam/coders/coders.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index f9ea076f97ce..b943afd0c2b5 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -36,10 +36,11 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.utils import proto_utils +# This is for py2/3 compatibility. cPickle was renamed pickle in python 3. try: - import cPickle as pickle + import cPickle as pickle # Python 2 except ImportError: - import pickle + import pickle # Python 3 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: From b23ee2514f1d0fac779a405521651d83d2946fc3 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Tue, 3 Apr 2018 19:16:05 +0200 Subject: [PATCH 15/19] Replace bytes with memoryview in stream cython files --- sdks/python/apache_beam/coders/stream.pxd | 4 ++-- sdks/python/apache_beam/coders/stream.pyx | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd index 6aed98676b6b..df15c3086320 100644 --- a/sdks/python/apache_beam/coders/stream.pxd +++ b/sdks/python/apache_beam/coders/stream.pxd @@ -23,7 +23,7 @@ cdef class OutputStream(object): cdef size_t buffer_size cdef size_t pos - cpdef write(self, b, bint nested=*) + cpdef write(self, const unsigned char[:] b, bint nested=*) cpdef write_byte(self, unsigned char val) cpdef write_var_int64(self, libc.stdint.int64_t v) cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v) @@ -39,7 +39,7 @@ cdef class OutputStream(object): cdef class ByteCountingOutputStream(OutputStream): cdef size_t count - cpdef write(self, b, bint nested=*) + cpdef write(self, const unsigned char[:] b, bint nested=*) cpdef write_byte(self, unsigned char val) cpdef write_bigendian_int64(self, libc.stdint.int64_t val) cpdef write_bigendian_uint64(self, libc.stdint.uint64_t val) diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx index 58f2d03cad60..414c294c3b9b 100644 --- a/sdks/python/apache_beam/coders/stream.pyx +++ b/sdks/python/apache_beam/coders/stream.pyx @@ -39,14 +39,15 @@ cdef class OutputStream(object): if self.data: libc.stdlib.free(self.data) - cpdef write(self, b, bint nested=False): + cpdef write(self, const unsigned char[:] b, bint nested=False): cdef size_t blen = len(b) if nested: self.write_var_int64(blen) if self.buffer_size < self.pos + blen: self.extend(blen) - libc.string.memcpy(self.data + self.pos, b, blen) - self.pos += blen + if blen > 0: + libc.string.memcpy(self.data + self.pos, &b[0], blen) + self.pos += blen cpdef write_byte(self, unsigned char val): if self.buffer_size < self.pos + 1: @@ -122,7 +123,7 @@ cdef class ByteCountingOutputStream(OutputStream): def __cinit__(self): self.count = 0 - cpdef write(self, b, bint nested=False): + cpdef write(self, const unsigned char[:] b, bint nested=False): cdef size_t blen = len(b) if nested: self.write_var_int64(blen) From 4cd5a76d240a68abcab72d8ccb35333c57eb7141 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Tue, 3 Apr 2018 22:44:03 +0200 Subject: [PATCH 16/19] Revert isinstance checks to type checks Remove unicode & dict type from .pxd file because of incompatibility with future.builtins --- sdks/python/apache_beam/coders/coder_impl.pxd | 1 - sdks/python/apache_beam/coders/coder_impl.py | 33 ++++++++++++------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 98dd508556a0..e32fe93be235 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -74,7 +74,6 @@ cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE, SET_TYPE cdef class FastPrimitivesCoderImpl(StreamCoderImpl): cdef CoderImpl fallback_coder_impl - @cython.locals(unicode_value=unicode, dict_value=dict) cpdef encode_to_stream(self, value, OutputStream stream, bint nested) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 0c7d84b5be2b..9cad5ac99a8e 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -25,24 +25,35 @@ coder_impl.pxd file for type hints. For internal use only; no backwards-compatibility guarantees. + +isort:skip_file """ from __future__ import absolute_import from __future__ import division from __future__ import print_function +native_int = int + +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +from builtins import bytes from builtins import chr +from builtins import dict from builtins import int from builtins import object from builtins import range from builtins import str +from past.builtins import dict as old_dict +from past.builtins import str as old_str +from past.builtins import long +from past.builtins import unicode + from apache_beam.coders import observable from apache_beam.utils import windowed_value from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.utils.timestamp import Timestamp -# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: from .stream import InputStream as create_InputStream from .stream import OutputStream as create_OutputStream @@ -277,32 +288,32 @@ def get_estimated_size_and_observables(self, value, nested=False): return out.get_count(), [] def encode_to_stream(self, value, stream, nested): - if value is None: + t = type(value) + if t is type(None): stream.write_byte(NONE_TYPE) - elif isinstance(value, bool): + elif t is bool: stream.write_byte(BOOL_TYPE) stream.write_byte(value) - elif isinstance(value, int): + elif t is int or t is native_int or t is long: stream.write_byte(INT_TYPE) stream.write_var_int64(value) - elif isinstance(value, float): + elif t is float: stream.write_byte(FLOAT_TYPE) stream.write_bigendian_double(value) - elif isinstance(value, bytes): + elif t is bytes or t is old_str: stream.write_byte(STR_TYPE) stream.write(value, nested) - elif isinstance(value, str): + elif t is str or t is unicode: unicode_value = value # for typing stream.write_byte(UNICODE_TYPE) stream.write(unicode_value.encode('utf-8'), nested) - elif isinstance(value, (list, tuple, set)): + elif t is list or t is tuple or t is set: stream.write_byte( - LIST_TYPE if isinstance(value, list) else TUPLE_TYPE if - isinstance(value, tuple) else SET_TYPE) + LIST_TYPE if t is list else TUPLE_TYPE if t is tuple else SET_TYPE) stream.write_var_int64(len(value)) for e in value: self.encode_to_stream(e, stream, True) - elif isinstance(value, dict): + elif t is dict or t is old_dict: dict_value = value # for typing stream.write_byte(DICT_TYPE) stream.write_var_int64(len(dict_value)) From f6dd643ecf75d4ad226531d1006c03c08e88eaea Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Thu, 5 Apr 2018 11:04:16 +0200 Subject: [PATCH 17/19] Split pylint for Python 3 compatibility into separate script --- sdks/python/run_pylint.sh | 91 ++++++++++------------------------ sdks/python/run_pylint_2to3.sh | 90 +++++++++++++++++++++++++++++++++ sdks/python/tox.ini | 2 +- 3 files changed, 116 insertions(+), 67 deletions(-) create mode 100644 sdks/python/run_pylint_2to3.sh diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index ccf37db8bc74..2505c20f2bb5 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -26,26 +26,16 @@ set -o errexit set -o pipefail -DEFAULT_MODULE=apache_beam +MODULE=apache_beam -usage(){ echo "Usage: $0 [MODULE|--help] -# The default MODULE is $DEFAULT_MODULE"; } +usage(){ echo "Usage: $0 [MODULE|--help] # The default MODULE is $MODULE"; } -MODULES=${DEFAULT_MODULE} -PY3K=false -while [[ $# -gt 0 ]] ; do - key="$1" - case ${key} in +if test $# -gt 0; then + case "$@" in --help) usage; exit 1;; - --py3k) PY3K=true; shift;; - *) - if [ ${MODULES} = ${DEFAULT_MODULE} ] ; then - MODULES=() - fi - MODULES+=("$1") - shift;; + *) MODULE="$*";; esac -done +fi # Following generated files are excluded from lint checks. EXCLUDED_GENERATED_FILES=( @@ -68,19 +58,15 @@ for file in "${EXCLUDED_GENERATED_FILES[@]}"; do done echo "Skipping lint for generated files: $FILES_TO_IGNORE" -echo "Running pylint for modules $( printf "%s " "${MODULES[@]}" ):" -pylint -j8 $( printf "%s " "${MODULES[@]}" ) \ - --ignore-patterns="$FILES_TO_IGNORE" \ - $( [ "$PY3K" = true ] && printf %s '--py3k' ) - -echo "Running pycodestyle for modules$( printf "%s " "${MODULES[@]}" ):" -pycodestyle $( printf "%s " "${MODULES[@]}" ) --exclude="$FILES_TO_IGNORE" -echo "Running flake8 for modules $( printf "%s " "${MODULES[@]}" ):" +echo "Running pylint for module $MODULE:" +pylint -j8 "$MODULE" --ignore-patterns="$FILES_TO_IGNORE" +echo "Running pycodestyle for module $MODULE:" +pycodestyle "$MODULE" --exclude="$FILES_TO_IGNORE" +echo "Running flake8 for module $MODULE:" # TODO(BEAM-3959): Add F821 (undefined names) as soon as that test passes -flake8 $( printf "%s " "${MODULES[@]}" ) --count --select=E9,F822,F823 \ - --show-source --statistics +flake8 $MODULE --count --select=E9,F822,F823 --show-source --statistics -echo "Running isort for modules $( printf "%s " "${MODULES[@]}" ):" +echo "Running isort for module $MODULE:" # Skip files where isort is behaving weirdly ISORT_EXCLUDED=( "apiclient.py" @@ -100,44 +86,17 @@ done for file in "${EXCLUDED_GENERATED_FILES[@]}"; do SKIP_PARAM="$SKIP_PARAM --skip $(basename $file)" done +pushd "$MODULE" +isort -p apache_beam --line-width 120 --check-only --order-by-type --combine-star --force-single-line-imports --diff ${SKIP_PARAM} +popd -for module in "$MODULES"; do - pushd "$module" - isort -p apache_beam --line-width 120 --check-only --order-by-type \ - --combine-star --force-single-line-imports --diff ${SKIP_PARAM} - popd -done - -FUTURIZE_EXCLUDED=( - "typehints.py" - "pb2" - "trivial_infernce.py" -) -FUTURIZE_GREP_PARAM=$( IFS='|'; echo "${ids[*]}" ) -echo "Checking for files requiring stage 1 refactoring from futurize" -futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored) -futurize_filtered=$(echo "$futurize_results" |grep -v "$FUTURIZE_GREP_PARAM" \ - || echo "") -count=${#futurize_filtered} -if [ "$count" != "0" ]; then - echo "Some of the changes require futurize stage 1 changes." - echo "The files with required changes:" - echo "$futurize_filtered" - echo "You can run futurize apache_beam to see the proposed changes." +echo "Checking unittest.main for module ${MODULE}:" +TESTS_MISSING_MAIN=$(find ${MODULE} | grep '\.py$' | xargs grep -l '^import unittest$' | xargs grep -L unittest.main) +if [ -n "${TESTS_MISSING_MAIN}" ]; then + echo -e "\nThe following files are missing a call to unittest.main():" + for FILE in ${TESTS_MISSING_MAIN}; do + echo " ${FILE}" + done + echo exit 1 -fi -echo "No future changes needed" - -echo "Checking unittest.main for modules $( printf "%s " "${MODULES[@]}" ):" -for module in "$MODULES"; do - TESTS_MISSING_MAIN=$(find ${module} | grep '\.py$' | xargs grep -l \ - '^import unittest$' | xargs grep -L unittest.main) - if [ -n "${TESTS_MISSING_MAIN}" ]; then - echo -e "\nThe following files are missing a call to unittest.main():" - for FILE in ${TESTS_MISSING_MAIN}; do - echo " ${FILE}" - done - echo - exit 1 - fi -done \ No newline at end of file +fi \ No newline at end of file diff --git a/sdks/python/run_pylint_2to3.sh b/sdks/python/run_pylint_2to3.sh new file mode 100644 index 000000000000..63df49d40ece --- /dev/null +++ b/sdks/python/run_pylint_2to3.sh @@ -0,0 +1,90 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This script will run pylint with the --py3k parameter to check for python +# 3 compatibility. This script can run on a list of modules provided as +# command line arguments. +# +# The exit-code of the script indicates success or a failure. + +set -o errexit +set -o pipefail + +DEFAULT_MODULE=apache_beam + +usage(){ echo "Usage: $0 [MODULE|--help] +# The default MODULE is $DEFAULT_MODULE"; } + +MODULES=${DEFAULT_MODULE} +while [[ $# -gt 0 ]] ; do + key="$1" + case ${key} in + --help) usage; exit 1;; + *) + if [ ${MODULES} = ${DEFAULT_MODULE} ] ; then + MODULES=() + fi + MODULES+=("$1") + shift;; + esac +done + +FUTURIZE_EXCLUDED=( + "typehints.py" + "pb2" + "trivial_infernce.py" +) +FUTURIZE_GREP_PARAM=$( IFS='|'; echo "${ids[*]}" ) +echo "Checking for files requiring stage 1 refactoring from futurize" +futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored) +futurize_filtered=$(echo "$futurize_results" |grep -v "$FUTURIZE_GREP_PARAM" \ + || echo "") +count=${#futurize_filtered} +if [ "$count" != "0" ]; then + echo "Some of the changes require futurize stage 1 changes." + echo "The files with required changes:" + echo "$futurize_filtered" + echo "You can run futurize apache_beam to see the proposed changes." + exit 1 +fi +echo "No future changes needed" + +# Following generated files are excluded from lint checks. +EXCLUDED_GENERATED_FILES=( +"apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py" +"apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py" +"apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py" +"apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py" +"apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py" +"apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py" +"apache_beam/coders/proto2_coder_test_messages_pb2.py" +apache_beam/portability/api/*pb2*.py +) + +FILES_TO_IGNORE="" +for file in "${EXCLUDED_GENERATED_FILES[@]}"; do + if test -z "$FILES_TO_IGNORE" + then FILES_TO_IGNORE="$(basename $file)" + else FILES_TO_IGNORE="$FILES_TO_IGNORE, $(basename $file)" + fi +done +echo "Skipping lint for generated files: $FILES_TO_IGNORE" + +echo "Running pylint --py3k for modules $( printf "%s " "${MODULES[@]}" ):" +pylint -j8 $( printf "%s " "${MODULES[@]}" ) \ + --ignore-patterns="$FILES_TO_IGNORE" --py3k \ No newline at end of file diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 314f86aa2a6f..aa7e4ca146e9 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -105,7 +105,7 @@ modules = commands = python --version pip --version - time {toxinidir}/run_pylint.sh --py3k {[testenv:py27-lint3]modules} + time {toxinidir}/run_pylint_2to3.sh {[testenv:py27-lint3]modules} [testenv:py3-lint] From 8e2d9b755fee50aa21d0770068a9c1978145d049 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Fri, 6 Apr 2018 10:29:59 +0200 Subject: [PATCH 18/19] Revert "Futurize and fix proto2_coder_test_messages_pb2.py" This reverts commit a9c0193 --- .../coders/proto2_coder_test_messages_pb2.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py b/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py index 6189c1feedb7..433d33f98841 100644 --- a/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py +++ b/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py @@ -17,15 +17,16 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: sdks/java/core/src/main/proto/proto2_coder_test_messages.proto -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function + +import sys from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database from google.protobuf import descriptor_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -35,7 +36,7 @@ name='apache_beam/coders/proto2_coder_test_messages.proto', package='proto2_coder_test_messages', syntax='proto2', - serialized_pb=b'\n3apache_beam/coders/proto2_coder_test_messages.proto\x12\x1aproto2_coder_test_messages\"P\n\x08MessageA\x12\x0e\n\x06\x66ield1\x18\x01 \x01(\t\x12\x34\n\x06\x66ield2\x18\x02 \x03(\x0b\x32$.proto2_coder_test_messages.MessageB\"\x1a\n\x08MessageB\x12\x0e\n\x06\x66ield1\x18\x01 \x01(\x08\"\x10\n\x08MessageC*\x04\x08\x64\x10j\"\xad\x01\n\x0eMessageWithMap\x12\x46\n\x06\x66ield1\x18\x01 \x03(\x0b\x32\x36.proto2_coder_test_messages.MessageWithMap.Field1Entry\x1aS\n\x0b\x46ield1Entry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.proto2_coder_test_messages.MessageA:\x02\x38\x01\"V\n\x18ReferencesMessageWithMap\x12:\n\x06\x66ield1\x18\x01 \x03(\x0b\x32*.proto2_coder_test_messages.MessageWithMap:Z\n\x06\x66ield1\x12$.proto2_coder_test_messages.MessageC\x18\x65 \x01(\x0b\x32$.proto2_coder_test_messages.MessageA:Z\n\x06\x66ield2\x12$.proto2_coder_test_messages.MessageC\x18\x66 \x01(\x0b\x32$.proto2_coder_test_messages.MessageBB\x1c\n\x1aorg.apache.beam.sdk.coders' + serialized_pb=_b('\n3apache_beam/coders/proto2_coder_test_messages.proto\x12\x1aproto2_coder_test_messages\"P\n\x08MessageA\x12\x0e\n\x06\x66ield1\x18\x01 \x01(\t\x12\x34\n\x06\x66ield2\x18\x02 \x03(\x0b\x32$.proto2_coder_test_messages.MessageB\"\x1a\n\x08MessageB\x12\x0e\n\x06\x66ield1\x18\x01 \x01(\x08\"\x10\n\x08MessageC*\x04\x08\x64\x10j\"\xad\x01\n\x0eMessageWithMap\x12\x46\n\x06\x66ield1\x18\x01 \x03(\x0b\x32\x36.proto2_coder_test_messages.MessageWithMap.Field1Entry\x1aS\n\x0b\x46ield1Entry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.proto2_coder_test_messages.MessageA:\x02\x38\x01\"V\n\x18ReferencesMessageWithMap\x12:\n\x06\x66ield1\x18\x01 \x03(\x0b\x32*.proto2_coder_test_messages.MessageWithMap:Z\n\x06\x66ield1\x12$.proto2_coder_test_messages.MessageC\x18\x65 \x01(\x0b\x32$.proto2_coder_test_messages.MessageA:Z\n\x06\x66ield2\x12$.proto2_coder_test_messages.MessageC\x18\x66 \x01(\x0b\x32$.proto2_coder_test_messages.MessageBB\x1c\n\x1aorg.apache.beam.sdk.coders') ) _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -68,7 +69,7 @@ _descriptor.FieldDescriptor( name='field1', full_name='proto2_coder_test_messages.MessageA.field1', index=0, number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=u"", + has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), @@ -161,7 +162,7 @@ _descriptor.FieldDescriptor( name='key', full_name='proto2_coder_test_messages.MessageWithMap.Field1Entry.key', index=0, number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=u"", + has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), @@ -178,8 +179,7 @@ nested_types=[], enum_types=[ ], - options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), - b'8\001'), + options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')), is_extendable=False, syntax='proto2', extension_ranges=[], @@ -312,9 +312,7 @@ MessageC.RegisterExtension(field2) DESCRIPTOR.has_options = True -DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), - b'\n\032org.apache.beam.sdk.coders') +DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\032org.apache.beam.sdk.coders')) _MESSAGEWITHMAP_FIELD1ENTRY.has_options = True -_MESSAGEWITHMAP_FIELD1ENTRY._options = _descriptor._ParseOptions( - descriptor_pb2.MessageOptions(), b'8\001') +_MESSAGEWITHMAP_FIELD1ENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) # @@protoc_insertion_point(module_scope) From 348605f94883a2166ad00705fa0cdb406a7a6da4 Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Thu, 5 Apr 2018 18:04:13 +0200 Subject: [PATCH 19/19] Fix some builtin typechecks --- sdks/python/apache_beam/coders/coder_impl.pxd | 1 + sdks/python/apache_beam/coders/coder_impl.py | 6 ++--- sdks/python/apache_beam/coders/typecoders.py | 22 ++++++++++++++----- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index e32fe93be235..dd82a00ab1a0 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -74,6 +74,7 @@ cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE, SET_TYPE cdef class FastPrimitivesCoderImpl(StreamCoderImpl): cdef CoderImpl fallback_coder_impl + @cython.locals(dict_value = dict) cpdef encode_to_stream(self, value, OutputStream stream, bint nested) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 9cad5ac99a8e..ee58c3fb0431 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -37,13 +37,11 @@ # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports from builtins import bytes from builtins import chr -from builtins import dict from builtins import int from builtins import object from builtins import range from builtins import str -from past.builtins import dict as old_dict from past.builtins import str as old_str from past.builtins import long from past.builtins import unicode @@ -289,7 +287,7 @@ def get_estimated_size_and_observables(self, value, nested=False): def encode_to_stream(self, value, stream, nested): t = type(value) - if t is type(None): + if value is None: stream.write_byte(NONE_TYPE) elif t is bool: stream.write_byte(BOOL_TYPE) @@ -313,7 +311,7 @@ def encode_to_stream(self, value, stream, nested): stream.write_var_int64(len(value)) for e in value: self.encode_to_stream(e, stream, True) - elif t is dict or t is old_dict: + elif t is dict: dict_value = value # for typing stream.write_byte(DICT_TYPE) stream.write_var_int64(len(dict_value)) diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index c8eafeaf79c3..92c0c161c6f4 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -67,6 +67,8 @@ def MakeXyzs(v): from __future__ import division from __future__ import print_function +from builtins import bytes +from builtins import int from builtins import object from builtins import str @@ -108,11 +110,21 @@ def register_coder(self, typehint_type, typehint_coder_class): self._register_coder_internal(typehint_type, typehint_coder_class) def get_coder(self, typehint): - coder = self._coders.get( - typehint.__class__ if isinstance(typehint, typehints.TypeConstraint) - else typehint, None) - if isinstance(typehint, typehints.TypeConstraint) and coder is not None: - return coder.from_type_hint(typehint, self) + if isinstance(typehint, typehints.TypeConstraint): + coder = self._coders.get(typehint.__class__) + if coder is not None: + return coder.from_type_hint(typehint, self) + else: + try: + t = typehint() + coder = self._coders.get( + str if isinstance(t, str) + else bytes if isinstance(t, bytes) + else int if isinstance(t, int) and not isinstance(t, bool) + else typehint, None) + except TypeError: + # typehint cannot be instantiated (without arguments) + coder = self._coders.get(typehint, None) if coder is None: # We use the fallback coder when there is no coder registered for a # typehint. For example a user defined class with no coder specified.