From 9850d3f5bb37786d9f3decf308157ab6a91e1f46 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 9 Jan 2018 15:15:52 -0800 Subject: [PATCH 1/3] First pass at fixing all of E999 (invalid parsing) errors in Py3 found by flake8 Mini style fixes Quick unpacking fixes Fix type tests to match the changed func Fix band to bandc oops Passe the message text through Switch mostly to raise_ even though raise_with_traceback should be closer. Make mini_py3lint executable Install future as well Fix tfidf div_word_count_by_total function (oops) Take @aaltay's comments into account Update comment, explicitly unpack tuple in test. Standardize on six to match @luke-zhu's work Add the envlists Remove lint_py3 for now --- .../examples/complete/game/game_stats.py | 4 +- .../apache_beam/examples/complete/tfidf.py | 8 +++- sdks/python/apache_beam/runners/common.py | 4 +- .../apache_beam/runners/direct/executor.py | 4 +- .../runners/portability/fn_api_runner.py | 2 +- .../runners/portability/fn_api_runner_test.py | 4 +- .../universal_local_runner_test.py | 7 ++-- .../apache_beam/runners/worker/data_plane.py | 4 +- .../runners/worker/data_plane_test.py | 3 +- .../apache_beam/runners/worker/sdk_worker.py | 4 +- sdks/python/apache_beam/transforms/util.py | 7 +++- .../python/apache_beam/typehints/typecheck.py | 11 ++--- .../apache_beam/typehints/typehints_test.py | 10 +++-- sdks/python/apache_beam/utils/retry.py | 3 +- sdks/python/run_mini_py3lint.sh | 40 +++++++++++++++++++ sdks/python/run_pylint.sh | 3 ++ sdks/python/tox.ini | 21 +++++++++- 17 files changed, 112 insertions(+), 27 deletions(-) create mode 100755 sdks/python/run_mini_py3lint.sh diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py b/sdks/python/apache_beam/examples/complete/game/game_stats.py index f9ccdc065e88..32f6f15e3a83 100644 --- a/sdks/python/apache_beam/examples/complete/game/game_stats.py +++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py @@ -192,8 +192,8 @@ def expand(self, user_scores): sum_scores # Use the derived mean total score (global_mean_score) as a side input. | 'ProcessAndFilter' >> beam.Filter( - lambda (_, score), global_mean:\ - score > global_mean * self.SCORE_WEIGHT, + lambda key_score, global_mean:\ + key_score[1] > global_mean * self.SCORE_WEIGHT, global_mean_score)) return filtered # [END abuse_detect] diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index 065e4b364686..20ad344e96a8 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -147,14 +147,18 @@ def compute_term_frequency(uri_count_and_total): # # This calculation uses a side input, a Dataflow-computed auxiliary value # presented to each invocation of our MapFn lambda. The second argument to - # the lambda (called total---note that we are unpacking the first argument) + # the function (called total---note that the first argument is a tuple) # receives the value we listed after the lambda in Map(). Additional side # inputs (and ordinary Python values, too) can be provided to MapFns and # DoFns in this way. + def div_word_count_by_total(word_count, total): + (word, count) = word_count + return (word, float(count) / total) + word_to_df = ( word_to_doc_count | 'ComputeDocFrequencies' >> beam.Map( - lambda (word, count), total: (word, float(count) / total), + div_word_count_by_total, AsSingleton(total_documents))) # Join the term frequency and document frequency collections, diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index d5ca68307aa5..39e5e99015d1 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -25,6 +25,8 @@ import sys import traceback +import six + from apache_beam.internal import util from apache_beam.metrics.execution import ScopedMetricsContainer from apache_beam.pvalue import TaggedOutput @@ -512,7 +514,7 @@ def _reraise_augmented(self, exn): traceback.format_exception_only(type(exn), exn)[-1].strip() + step_annotation) new_exn._tagged_with_step = True - raise new_exn, None, original_traceback + six.raise_from(new_exn, original_traceback) class OutputProcessor(object): diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 34c12c345a0e..91ea5e8668e7 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -28,6 +28,8 @@ import traceback from weakref import WeakValueDictionary +import six + from apache_beam.metrics.execution import MetricsContainer from apache_beam.metrics.execution import ScopedMetricsContainer @@ -398,7 +400,7 @@ def await_completion(self): try: if update.exception: t, v, tb = update.exc_info - raise t, v, tb + six.reraise(t, v, tb) finally: self.executor_service.shutdown() self.executor_service.await_completion() diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index c36bae7114a6..5c1b1de12aa1 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -1162,7 +1162,7 @@ def run(self): self._latest_progress = progress_result.process_bundle_progress if self._callback: self._callback(self._latest_progress) - except Exception, exn: + except Exception as exn: logging.error("Bad progress: %s", exn) time.sleep(self._frequency) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index e27da621e485..c0f17cec563c 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import print_function + import functools import logging import time @@ -218,7 +220,7 @@ def test_progress_metrics(self): m_out.processed_elements.measured.output_element_counts['twice']) except: - print res._metrics_by_stage + print(res._metrics_by_stage) raise # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py index 1fc244b20f20..66181c014f71 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import print_function import logging import platform @@ -41,13 +42,13 @@ def setUp(self): if platform.system() != 'Windows': def handler(signum, frame): msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS - print '=' * 20, msg, '=' * 20 + print('=' * 20, msg, '=' * 20) traceback.print_stack(frame) threads_by_id = {th.ident: th for th in threading.enumerate()} for thread_id, stack in sys._current_frames().items(): th = threads_by_id.get(thread_id) - print - print '# Thread:', th or thread_id + print() + print('# Thread:', th or thread_id) traceback.print_stack(stack) raise BaseException(msg) signal.signal(signal.SIGALRM, handler) diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 2e4f2d6f69a7..f95cd14dc74f 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -29,6 +29,7 @@ import threading import grpc +import six from apache_beam.coders import coder_impl from apache_beam.portability.api import beam_fn_api_pb2 @@ -182,7 +183,8 @@ def input_elements(self, instruction_id, expected_targets): data = received.get(timeout=1) except queue.Empty: if self._exc_info: - raise self.exc_info[0], self.exc_info[1], self.exc_info[2] + t, v, tb = self._exc_info + six.raise_from(t, v, tb) else: if not data.data and data.target in expected_targets: done_targets.append(data.target) diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py b/sdks/python/apache_beam/runners/worker/data_plane_test.py index 07ba8fd44f1f..b70351f203ae 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane_test.py +++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py @@ -28,6 +28,7 @@ from concurrent import futures import grpc +import six from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc @@ -50,7 +51,7 @@ def call_fn(): thread.join(timeout_secs) if exc_info: t, v, tb = exc_info # pylint: disable=unbalanced-tuple-unpacking - raise t, v, tb + six.raise_from(t, v, tb) assert not thread.is_alive(), 'timed out after %s seconds' % timeout_secs return wrapper return decorate diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 2767530adb0b..74d12594be4a 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -28,6 +28,7 @@ from concurrent import futures import grpc +import six from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc @@ -288,7 +289,8 @@ def _blocking_request(self, request): self._requests.put(request) while not future.wait(timeout=1): if self._exc_info: - raise self._exc_info[0], self._exc_info[1], self._exc_info[2] + t, v, tb = self._exc_info + six.raise_from(t, v, tb) elif self._done: raise RuntimeError() del self._responses_by_id[request.id] diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 8185e64a67cf..7bd5d45fa4b5 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -259,8 +259,13 @@ def _thin_data(self): sorted_data = sorted(self._data) odd_one_out = [sorted_data[-1]] if len(sorted_data) % 2 == 1 else [] # Sort the pairs by how different they are. + + def div_keys(kv1_kv2): + (x1, _), (x2, _) = kv1_kv2 + return x2 / x1 + pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]), - key=lambda ((x1, _1), (x2, _2)): x2 / x1) + key=div_keys) # Keep the top 1/3 most different pairs, average the top 2/3 most similar. threshold = 2 * len(pairs) / 3 self._data = ( diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index 7c7012c8aea7..b22be1ef2c89 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -20,6 +20,7 @@ For internal use only; no backwards-compatibility guarantees. """ + import collections import inspect import sys @@ -86,7 +87,7 @@ def wrapper(self, method, args, kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within ParDo(%s): ' '%s' % (self.full_label, e)) - six.reraise(TypeCheckError, error_msg, sys.exc_info()[2]) + six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2]) else: return self._check_type(result) @@ -175,12 +176,12 @@ def _type_check(self, type_constraint, datum, is_input): try: check_constraint(type_constraint, datum) except CompositeTypeHintError as e: - six.reraise(TypeCheckError, e.args[0], sys.exc_info()[2]) + six.raise_from(TypeCheckError(e.message), sys.exc_info()[2]) except SimpleTypeHintError: error_msg = ("According to type-hint expected %s should be of type %s. " "Instead, received '%s', an instance of type %s." % (datum_type, type_constraint, datum, type(datum))) - six.reraise(TypeCheckError, error_msg, sys.exc_info()[2]) + six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2]) class TypeCheckCombineFn(core.CombineFn): @@ -205,7 +206,7 @@ def add_input(self, accumulator, element, *args, **kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within %s: ' '%s' % (self._label, e)) - raise TypeCheckError, error_msg, sys.exc_info()[2] + six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2]) return self._combinefn.add_input(accumulator, element, *args, **kwargs) def merge_accumulators(self, accumulators, *args, **kwargs): @@ -220,7 +221,7 @@ def extract_output(self, accumulator, *args, **kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within %s: ' '%s' % (self._label, e)) - raise TypeCheckError, error_msg, sys.exc_info()[2] + six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2]) return result diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py index 2994adc0aa5f..70ebcb3a3ff1 100644 --- a/sdks/python/apache_beam/typehints/typehints_test.py +++ b/sdks/python/apache_beam/typehints/typehints_test.py @@ -1046,15 +1046,17 @@ def test_positional_arg_hints(self): _positional_arg_hints(['x', 'y'], {'x': int})) def test_getcallargs_forhints(self): - func = lambda a, (b, c), *d: None + def func(a, b_c, *d): + b, c = b_c # pylint: disable=unused-variable + return None self.assertEquals( - {'a': Any, 'b': Any, 'c': Any, 'd': Tuple[Any, ...]}, + {'a': Any, 'b_c': Any, 'd': Tuple[Any, ...]}, getcallargs_forhints(func, *[Any, Any])) self.assertEquals( - {'a': Any, 'b': Any, 'c': Any, 'd': Tuple[Any, ...]}, + {'a': Any, 'b_c': Any, 'd': Tuple[Any, ...]}, getcallargs_forhints(func, *[Any, Any, Any, int])) self.assertEquals( - {'a': int, 'b': str, 'c': Any, 'd': Tuple[Any, ...]}, + {'a': int, 'b_c': Tuple[str, Any], 'd': Tuple[Any, ...]}, getcallargs_forhints(func, *[int, Tuple[str, Any]])) diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 97bd03798509..d1d1fa2f371e 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -25,6 +25,7 @@ needed right now use a @retry.no_retries decorator. """ + import logging import random import sys @@ -187,7 +188,7 @@ def wrapper(*args, **kwargs): sleep_interval = next(retry_intervals) except StopIteration: # Re-raise the original exception since we finished the retries. - six.reraise(exn, None, exn_traceback) # pylint: disable=raising-bad-type + six.raise_from(exn, exn_traceback) logger( 'Retry with exponential backoff: waiting for %s seconds before ' diff --git a/sdks/python/run_mini_py3lint.sh b/sdks/python/run_mini_py3lint.sh new file mode 100755 index 000000000000..f5e9acd70b3a --- /dev/null +++ b/sdks/python/run_mini_py3lint.sh @@ -0,0 +1,40 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This script will run python3 ready style checks +# +# Currently only flake8 E999 +# +# The exit-code of the script indicates success or a failure. + +set -o errexit +set -o pipefail + +MODULE=apache_beam + +usage(){ echo "Usage: $0 [MODULE|--help] # The default MODULE is $MODULE"; } + +if test $# -gt 0; then + case "$@" in + --help) usage; exit 1;; + *) MODULE="$*";; + esac +fi + +echo "Running flake8 for module $MODULE:" +flake8 $MODULE --count --select=E999 --show-source --statistics diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index d2d424e28d9e..5b9e1ed027a2 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -62,6 +62,8 @@ echo "Running pylint for module $MODULE:" pylint -j8 "$MODULE" --ignore-patterns="$FILES_TO_IGNORE" echo "Running pycodestyle for module $MODULE:" pycodestyle "$MODULE" --exclude="$FILES_TO_IGNORE" +echo "Running flake8 for module $MODULE:" +flake8 $MODULE --count --select=E999 --show-source --statistics echo "Running isort for module $MODULE:" # Skip files where isort is behaving weirdly ISORT_EXCLUDED=( @@ -69,6 +71,7 @@ ISORT_EXCLUDED=( "avroio_test.py" "datastore_wordcount.py" "datastoreio_test.py" + "hadoopfilesystem.py" "iobase_test.py" "fast_coders_test.py" "slow_coders_test.py" diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 31bcc9cdb572..857e7b0e571f 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -17,7 +17,8 @@ [tox] # new environments will be excluded by default unless explicitly added to envlist. -envlist = py27,py27gcp,py27cython,lint,docs +# TODO (after BEAM-3671) add lint_py3 back in. +envlist = py27,py27gcp,py27cython,lint_py2,docs toxworkdir = {toxinidir}/target/.tox [pycodestyle] @@ -90,13 +91,14 @@ commands = python setup.py test passenv = TRAVIS* -[testenv:lint] +[testenv:lint_py2] deps= nose==1.3.7 pycodestyle==2.3.1 pylint==1.7.2 future==0.16.0 isort==4.2.15 + flake8==3.5.0 whitelist_externals=time commands = python --version @@ -105,6 +107,21 @@ commands = time {toxinidir}/run_pylint.sh passenv = TRAVIS* +[testenv:lint_py3] +deps= + nose==1.3.7 + pycodestyle==2.3.1 + pylint==1.7.2 + future==0.16.0 + isort==4.2.15 + flake8==3.5.0 +whitelist_externals=time +commands = + time pip install -e .[test] + time {toxinidir}/run_mini_py3lint.sh +passenv = TRAVIS* + + [testenv:docs] deps= nose==1.3.7 From 957937298f120f712048859b4f16885618a0eace Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 23 Feb 2018 17:21:48 -0800 Subject: [PATCH 2/3] Fix some raise_from to reraise. --- sdks/python/apache_beam/runners/worker/data_plane.py | 2 +- sdks/python/apache_beam/runners/worker/data_plane_test.py | 2 +- sdks/python/apache_beam/runners/worker/sdk_worker.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index f95cd14dc74f..f554646c6596 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -184,7 +184,7 @@ def input_elements(self, instruction_id, expected_targets): except queue.Empty: if self._exc_info: t, v, tb = self._exc_info - six.raise_from(t, v, tb) + six.reraise(t, v, tb) else: if not data.data and data.target in expected_targets: done_targets.append(data.target) diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py b/sdks/python/apache_beam/runners/worker/data_plane_test.py index b70351f203ae..b2cefbe1469c 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane_test.py +++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py @@ -51,7 +51,7 @@ def call_fn(): thread.join(timeout_secs) if exc_info: t, v, tb = exc_info # pylint: disable=unbalanced-tuple-unpacking - six.raise_from(t, v, tb) + six.reraise(t, v, tb) assert not thread.is_alive(), 'timed out after %s seconds' % timeout_secs return wrapper return decorate diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 74d12594be4a..d54c82a8a655 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -290,7 +290,7 @@ def _blocking_request(self, request): while not future.wait(timeout=1): if self._exc_info: t, v, tb = self._exc_info - six.raise_from(t, v, tb) + six.reraise(t, v, tb) elif self._done: raise RuntimeError() del self._responses_by_id[request.id] From 9f7aa9bea31fa37a6b7359cf3be3ced5f9630200 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 23 Feb 2018 18:40:17 -0800 Subject: [PATCH 3/3] vcfio somehow has some sort issues. It's not overly important and hopefully we can remove isort after the py3 migration is complete and just depend on pylint. --- sdks/python/run_pylint.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index 5b9e1ed027a2..8696a8272d4e 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -75,6 +75,7 @@ ISORT_EXCLUDED=( "iobase_test.py" "fast_coders_test.py" "slow_coders_test.py" + "vcfio.py" ) SKIP_PARAM="" for file in "${ISORT_EXCLUDED[@]}"; do