diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py b/sdks/python/apache_beam/examples/complete/game/game_stats.py index f9ccdc065e88..32f6f15e3a83 100644 --- a/sdks/python/apache_beam/examples/complete/game/game_stats.py +++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py @@ -192,8 +192,8 @@ def expand(self, user_scores): sum_scores # Use the derived mean total score (global_mean_score) as a side input. | 'ProcessAndFilter' >> beam.Filter( - lambda (_, score), global_mean:\ - score > global_mean * self.SCORE_WEIGHT, + lambda key_score, global_mean:\ + key_score[1] > global_mean * self.SCORE_WEIGHT, global_mean_score)) return filtered # [END abuse_detect] diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index 065e4b364686..20ad344e96a8 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -147,14 +147,18 @@ def compute_term_frequency(uri_count_and_total): # # This calculation uses a side input, a Dataflow-computed auxiliary value # presented to each invocation of our MapFn lambda. The second argument to - # the lambda (called total---note that we are unpacking the first argument) + # the function (called total---note that the first argument is a tuple) # receives the value we listed after the lambda in Map(). Additional side # inputs (and ordinary Python values, too) can be provided to MapFns and # DoFns in this way. + def div_word_count_by_total(word_count, total): + (word, count) = word_count + return (word, float(count) / total) + word_to_df = ( word_to_doc_count | 'ComputeDocFrequencies' >> beam.Map( - lambda (word, count), total: (word, float(count) / total), + div_word_count_by_total, AsSingleton(total_documents))) # Join the term frequency and document frequency collections, diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index d5ca68307aa5..39e5e99015d1 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -25,6 +25,8 @@ import sys import traceback +import six + from apache_beam.internal import util from apache_beam.metrics.execution import ScopedMetricsContainer from apache_beam.pvalue import TaggedOutput @@ -512,7 +514,7 @@ def _reraise_augmented(self, exn): traceback.format_exception_only(type(exn), exn)[-1].strip() + step_annotation) new_exn._tagged_with_step = True - raise new_exn, None, original_traceback + six.raise_from(new_exn, original_traceback) class OutputProcessor(object): diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 34c12c345a0e..91ea5e8668e7 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -28,6 +28,8 @@ import traceback from weakref import WeakValueDictionary +import six + from apache_beam.metrics.execution import MetricsContainer from apache_beam.metrics.execution import ScopedMetricsContainer @@ -398,7 +400,7 @@ def await_completion(self): try: if update.exception: t, v, tb = update.exc_info - raise t, v, tb + six.reraise(t, v, tb) finally: self.executor_service.shutdown() self.executor_service.await_completion() diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index c36bae7114a6..5c1b1de12aa1 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -1162,7 +1162,7 @@ def run(self): self._latest_progress = progress_result.process_bundle_progress if self._callback: self._callback(self._latest_progress) - except Exception, exn: + except Exception as exn: logging.error("Bad progress: %s", exn) time.sleep(self._frequency) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index e27da621e485..c0f17cec563c 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import print_function + import functools import logging import time @@ -218,7 +220,7 @@ def test_progress_metrics(self): m_out.processed_elements.measured.output_element_counts['twice']) except: - print res._metrics_by_stage + print(res._metrics_by_stage) raise # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py index 1fc244b20f20..66181c014f71 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import print_function import logging import platform @@ -41,13 +42,13 @@ def setUp(self): if platform.system() != 'Windows': def handler(signum, frame): msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS - print '=' * 20, msg, '=' * 20 + print('=' * 20, msg, '=' * 20) traceback.print_stack(frame) threads_by_id = {th.ident: th for th in threading.enumerate()} for thread_id, stack in sys._current_frames().items(): th = threads_by_id.get(thread_id) - print - print '# Thread:', th or thread_id + print() + print('# Thread:', th or thread_id) traceback.print_stack(stack) raise BaseException(msg) signal.signal(signal.SIGALRM, handler) diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 2e4f2d6f69a7..f554646c6596 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -29,6 +29,7 @@ import threading import grpc +import six from apache_beam.coders import coder_impl from apache_beam.portability.api import beam_fn_api_pb2 @@ -182,7 +183,8 @@ def input_elements(self, instruction_id, expected_targets): data = received.get(timeout=1) except queue.Empty: if self._exc_info: - raise self.exc_info[0], self.exc_info[1], self.exc_info[2] + t, v, tb = self._exc_info + six.reraise(t, v, tb) else: if not data.data and data.target in expected_targets: done_targets.append(data.target) diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py b/sdks/python/apache_beam/runners/worker/data_plane_test.py index 07ba8fd44f1f..b2cefbe1469c 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane_test.py +++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py @@ -28,6 +28,7 @@ from concurrent import futures import grpc +import six from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc @@ -50,7 +51,7 @@ def call_fn(): thread.join(timeout_secs) if exc_info: t, v, tb = exc_info # pylint: disable=unbalanced-tuple-unpacking - raise t, v, tb + six.reraise(t, v, tb) assert not thread.is_alive(), 'timed out after %s seconds' % timeout_secs return wrapper return decorate diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 2767530adb0b..d54c82a8a655 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -28,6 +28,7 @@ from concurrent import futures import grpc +import six from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc @@ -288,7 +289,8 @@ def _blocking_request(self, request): self._requests.put(request) while not future.wait(timeout=1): if self._exc_info: - raise self._exc_info[0], self._exc_info[1], self._exc_info[2] + t, v, tb = self._exc_info + six.reraise(t, v, tb) elif self._done: raise RuntimeError() del self._responses_by_id[request.id] diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 8185e64a67cf..7bd5d45fa4b5 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -259,8 +259,13 @@ def _thin_data(self): sorted_data = sorted(self._data) odd_one_out = [sorted_data[-1]] if len(sorted_data) % 2 == 1 else [] # Sort the pairs by how different they are. + + def div_keys(kv1_kv2): + (x1, _), (x2, _) = kv1_kv2 + return x2 / x1 + pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]), - key=lambda ((x1, _1), (x2, _2)): x2 / x1) + key=div_keys) # Keep the top 1/3 most different pairs, average the top 2/3 most similar. threshold = 2 * len(pairs) / 3 self._data = ( diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index 7c7012c8aea7..b22be1ef2c89 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -20,6 +20,7 @@ For internal use only; no backwards-compatibility guarantees. """ + import collections import inspect import sys @@ -86,7 +87,7 @@ def wrapper(self, method, args, kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within ParDo(%s): ' '%s' % (self.full_label, e)) - six.reraise(TypeCheckError, error_msg, sys.exc_info()[2]) + six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2]) else: return self._check_type(result) @@ -175,12 +176,12 @@ def _type_check(self, type_constraint, datum, is_input): try: check_constraint(type_constraint, datum) except CompositeTypeHintError as e: - six.reraise(TypeCheckError, e.args[0], sys.exc_info()[2]) + six.raise_from(TypeCheckError(e.message), sys.exc_info()[2]) except SimpleTypeHintError: error_msg = ("According to type-hint expected %s should be of type %s. " "Instead, received '%s', an instance of type %s." % (datum_type, type_constraint, datum, type(datum))) - six.reraise(TypeCheckError, error_msg, sys.exc_info()[2]) + six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2]) class TypeCheckCombineFn(core.CombineFn): @@ -205,7 +206,7 @@ def add_input(self, accumulator, element, *args, **kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within %s: ' '%s' % (self._label, e)) - raise TypeCheckError, error_msg, sys.exc_info()[2] + six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2]) return self._combinefn.add_input(accumulator, element, *args, **kwargs) def merge_accumulators(self, accumulators, *args, **kwargs): @@ -220,7 +221,7 @@ def extract_output(self, accumulator, *args, **kwargs): except TypeCheckError as e: error_msg = ('Runtime type violation detected within %s: ' '%s' % (self._label, e)) - raise TypeCheckError, error_msg, sys.exc_info()[2] + six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2]) return result diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py index 2994adc0aa5f..70ebcb3a3ff1 100644 --- a/sdks/python/apache_beam/typehints/typehints_test.py +++ b/sdks/python/apache_beam/typehints/typehints_test.py @@ -1046,15 +1046,17 @@ def test_positional_arg_hints(self): _positional_arg_hints(['x', 'y'], {'x': int})) def test_getcallargs_forhints(self): - func = lambda a, (b, c), *d: None + def func(a, b_c, *d): + b, c = b_c # pylint: disable=unused-variable + return None self.assertEquals( - {'a': Any, 'b': Any, 'c': Any, 'd': Tuple[Any, ...]}, + {'a': Any, 'b_c': Any, 'd': Tuple[Any, ...]}, getcallargs_forhints(func, *[Any, Any])) self.assertEquals( - {'a': Any, 'b': Any, 'c': Any, 'd': Tuple[Any, ...]}, + {'a': Any, 'b_c': Any, 'd': Tuple[Any, ...]}, getcallargs_forhints(func, *[Any, Any, Any, int])) self.assertEquals( - {'a': int, 'b': str, 'c': Any, 'd': Tuple[Any, ...]}, + {'a': int, 'b_c': Tuple[str, Any], 'd': Tuple[Any, ...]}, getcallargs_forhints(func, *[int, Tuple[str, Any]])) diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 97bd03798509..d1d1fa2f371e 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -25,6 +25,7 @@ needed right now use a @retry.no_retries decorator. """ + import logging import random import sys @@ -187,7 +188,7 @@ def wrapper(*args, **kwargs): sleep_interval = next(retry_intervals) except StopIteration: # Re-raise the original exception since we finished the retries. - six.reraise(exn, None, exn_traceback) # pylint: disable=raising-bad-type + six.raise_from(exn, exn_traceback) logger( 'Retry with exponential backoff: waiting for %s seconds before ' diff --git a/sdks/python/run_mini_py3lint.sh b/sdks/python/run_mini_py3lint.sh new file mode 100755 index 000000000000..f5e9acd70b3a --- /dev/null +++ b/sdks/python/run_mini_py3lint.sh @@ -0,0 +1,40 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This script will run python3 ready style checks +# +# Currently only flake8 E999 +# +# The exit-code of the script indicates success or a failure. + +set -o errexit +set -o pipefail + +MODULE=apache_beam + +usage(){ echo "Usage: $0 [MODULE|--help] # The default MODULE is $MODULE"; } + +if test $# -gt 0; then + case "$@" in + --help) usage; exit 1;; + *) MODULE="$*";; + esac +fi + +echo "Running flake8 for module $MODULE:" +flake8 $MODULE --count --select=E999 --show-source --statistics diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index d2d424e28d9e..8696a8272d4e 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -62,6 +62,8 @@ echo "Running pylint for module $MODULE:" pylint -j8 "$MODULE" --ignore-patterns="$FILES_TO_IGNORE" echo "Running pycodestyle for module $MODULE:" pycodestyle "$MODULE" --exclude="$FILES_TO_IGNORE" +echo "Running flake8 for module $MODULE:" +flake8 $MODULE --count --select=E999 --show-source --statistics echo "Running isort for module $MODULE:" # Skip files where isort is behaving weirdly ISORT_EXCLUDED=( @@ -69,9 +71,11 @@ ISORT_EXCLUDED=( "avroio_test.py" "datastore_wordcount.py" "datastoreio_test.py" + "hadoopfilesystem.py" "iobase_test.py" "fast_coders_test.py" "slow_coders_test.py" + "vcfio.py" ) SKIP_PARAM="" for file in "${ISORT_EXCLUDED[@]}"; do diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 31bcc9cdb572..857e7b0e571f 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -17,7 +17,8 @@ [tox] # new environments will be excluded by default unless explicitly added to envlist. -envlist = py27,py27gcp,py27cython,lint,docs +# TODO (after BEAM-3671) add lint_py3 back in. +envlist = py27,py27gcp,py27cython,lint_py2,docs toxworkdir = {toxinidir}/target/.tox [pycodestyle] @@ -90,13 +91,14 @@ commands = python setup.py test passenv = TRAVIS* -[testenv:lint] +[testenv:lint_py2] deps= nose==1.3.7 pycodestyle==2.3.1 pylint==1.7.2 future==0.16.0 isort==4.2.15 + flake8==3.5.0 whitelist_externals=time commands = python --version @@ -105,6 +107,21 @@ commands = time {toxinidir}/run_pylint.sh passenv = TRAVIS* +[testenv:lint_py3] +deps= + nose==1.3.7 + pycodestyle==2.3.1 + pylint==1.7.2 + future==0.16.0 + isort==4.2.15 + flake8==3.5.0 +whitelist_externals=time +commands = + time pip install -e .[test] + time {toxinidir}/run_mini_py3lint.sh +passenv = TRAVIS* + + [testenv:docs] deps= nose==1.3.7