Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/examples/complete/game/game_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ def expand(self, user_scores):
sum_scores
# Use the derived mean total score (global_mean_score) as a side input.
| 'ProcessAndFilter' >> beam.Filter(
lambda (_, score), global_mean:\
score > global_mean * self.SCORE_WEIGHT,
lambda key_score, global_mean:\
key_score[1] > global_mean * self.SCORE_WEIGHT,
global_mean_score))
return filtered
# [END abuse_detect]
Expand Down
8 changes: 6 additions & 2 deletions sdks/python/apache_beam/examples/complete/tfidf.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,18 @@ def compute_term_frequency(uri_count_and_total):
#
# This calculation uses a side input, a Dataflow-computed auxiliary value
# presented to each invocation of our MapFn lambda. The second argument to
# the lambda (called total---note that we are unpacking the first argument)
# the function (called total---note that the first argument is a tuple)
# receives the value we listed after the lambda in Map(). Additional side
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment block is referring to the removed lambda.

# inputs (and ordinary Python values, too) can be provided to MapFns and
# DoFns in this way.
def div_word_count_by_total(word_count, total):
(word, count) = word_count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: You don't need parentheses in the unpacking or the packing into tuples.

return (word, float(count) / total)

word_to_df = (
word_to_doc_count
| 'ComputeDocFrequencies' >> beam.Map(
lambda (word, count), total: (word, float(count) / total),
div_word_count_by_total,
AsSingleton(total_documents)))

# Join the term frequency and document frequency collections,
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import sys
import traceback

import six

from apache_beam.internal import util
from apache_beam.metrics.execution import ScopedMetricsContainer
from apache_beam.pvalue import TaggedOutput
Expand Down Expand Up @@ -512,7 +514,7 @@ def _reraise_augmented(self, exn):
traceback.format_exception_only(type(exn), exn)[-1].strip()
+ step_annotation)
new_exn._tagged_with_step = True
raise new_exn, None, original_traceback
six.raise_from(new_exn, original_traceback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This use of raise_from instead of reraise led to a bug with dropped stacktraces: https://issues.apache.org/jira/projects/BEAM/issues/BEAM-3956

Are the other uses of raise_from instead of reraise in this PR appropriate?



class OutputProcessor(object):
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/runners/direct/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import traceback
from weakref import WeakValueDictionary

import six

from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import ScopedMetricsContainer

Expand Down Expand Up @@ -398,7 +400,7 @@ def await_completion(self):
try:
if update.exception:
t, v, tb = update.exc_info
raise t, v, tb
six.reraise(t, v, tb)
finally:
self.executor_service.shutdown()
self.executor_service.await_completion()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ def run(self):
self._latest_progress = progress_result.process_bundle_progress
if self._callback:
self._callback(self._latest_progress)
except Exception, exn:
except Exception as exn:
logging.error("Bad progress: %s", exn)
time.sleep(self._frequency)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function

import functools
import logging
import time
Expand Down Expand Up @@ -218,7 +220,7 @@ def test_progress_metrics(self):
m_out.processed_elements.measured.output_element_counts['twice'])

except:
print res._metrics_by_stage
print(res._metrics_by_stage)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use logging. logging.info perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure since this is done in a test, I think printing might actually be the right behaviour.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, printing is fine. It's just so that if this test fails, there is some context.

raise

# Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function

import logging
import platform
Expand Down Expand Up @@ -41,13 +42,13 @@ def setUp(self):
if platform.system() != 'Windows':
def handler(signum, frame):
msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
print '=' * 20, msg, '=' * 20
print('=' * 20, msg, '=' * 20)
traceback.print_stack(frame)
threads_by_id = {th.ident: th for th in threading.enumerate()}
for thread_id, stack in sys._current_frames().items():
th = threads_by_id.get(thread_id)
print
print '# Thread:', th or thread_id
print()
print('# Thread:', th or thread_id)
traceback.print_stack(stack)
raise BaseException(msg)
signal.signal(signal.SIGALRM, handler)
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/runners/worker/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import threading

import grpc
import six

from apache_beam.coders import coder_impl
from apache_beam.portability.api import beam_fn_api_pb2
Expand Down Expand Up @@ -182,7 +183,8 @@ def input_elements(self, instruction_id, expected_targets):
data = received.get(timeout=1)
except queue.Empty:
if self._exc_info:
raise self.exc_info[0], self.exc_info[1], self.exc_info[2]
t, v, tb = self._exc_info
six.reraise(t, v, tb)
else:
if not data.data and data.target in expected_targets:
done_targets.append(data.target)
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/runners/worker/data_plane_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from concurrent import futures

import grpc
import six

from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
Expand All @@ -50,7 +51,7 @@ def call_fn():
thread.join(timeout_secs)
if exc_info:
t, v, tb = exc_info # pylint: disable=unbalanced-tuple-unpacking
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need disable=unbalanced-tuple-unpacking, we do not have it in data_plane.py:186

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, either way its probably unrelated to the flake8 changes. I think doing a follow up sweep of the linter disable statements could be a good starter task for someone later?

raise t, v, tb
six.reraise(t, v, tb)
assert not thread.is_alive(), 'timed out after %s seconds' % timeout_secs
return wrapper
return decorate
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from concurrent import futures

import grpc
import six

from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
Expand Down Expand Up @@ -288,7 +289,8 @@ def _blocking_request(self, request):
self._requests.put(request)
while not future.wait(timeout=1):
if self._exc_info:
raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
t, v, tb = self._exc_info
six.reraise(t, v, tb)
elif self._done:
raise RuntimeError()
del self._responses_by_id[request.id]
Expand Down
7 changes: 6 additions & 1 deletion sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,13 @@ def _thin_data(self):
sorted_data = sorted(self._data)
odd_one_out = [sorted_data[-1]] if len(sorted_data) % 2 == 1 else []
# Sort the pairs by how different they are.

def div_keys(kv1_kv2):
(x1, _), (x2, _) = kv1_kv2
return x2 / x1

pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]),
key=lambda ((x1, _1), (x2, _2)): x2 / x1)
key=div_keys)
# Keep the top 1/3 most different pairs, average the top 2/3 most similar.
threshold = 2 * len(pairs) / 3
self._data = (
Expand Down
11 changes: 6 additions & 5 deletions sdks/python/apache_beam/typehints/typecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
For internal use only; no backwards-compatibility guarantees.
"""


import collections
import inspect
import sys
Expand Down Expand Up @@ -86,7 +87,7 @@ def wrapper(self, method, args, kwargs):
except TypeCheckError as e:
error_msg = ('Runtime type violation detected within ParDo(%s): '
'%s' % (self.full_label, e))
six.reraise(TypeCheckError, error_msg, sys.exc_info()[2])
six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2])
else:
return self._check_type(result)

Expand Down Expand Up @@ -175,12 +176,12 @@ def _type_check(self, type_constraint, datum, is_input):
try:
check_constraint(type_constraint, datum)
except CompositeTypeHintError as e:
six.reraise(TypeCheckError, e.args[0], sys.exc_info()[2])
six.raise_from(TypeCheckError(e.message), sys.exc_info()[2])
except SimpleTypeHintError:
error_msg = ("According to type-hint expected %s should be of type %s. "
"Instead, received '%s', an instance of type %s."
% (datum_type, type_constraint, datum, type(datum)))
six.reraise(TypeCheckError, error_msg, sys.exc_info()[2])
six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2])


class TypeCheckCombineFn(core.CombineFn):
Expand All @@ -205,7 +206,7 @@ def add_input(self, accumulator, element, *args, **kwargs):
except TypeCheckError as e:
error_msg = ('Runtime type violation detected within %s: '
'%s' % (self._label, e))
raise TypeCheckError, error_msg, sys.exc_info()[2]
six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2])
return self._combinefn.add_input(accumulator, element, *args, **kwargs)

def merge_accumulators(self, accumulators, *args, **kwargs):
Expand All @@ -220,7 +221,7 @@ def extract_output(self, accumulator, *args, **kwargs):
except TypeCheckError as e:
error_msg = ('Runtime type violation detected within %s: '
'%s' % (self._label, e))
raise TypeCheckError, error_msg, sys.exc_info()[2]
six.raise_from(TypeCheckError(error_msg), sys.exc_info()[2])
return result


Expand Down
10 changes: 6 additions & 4 deletions sdks/python/apache_beam/typehints/typehints_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,15 +1046,17 @@ def test_positional_arg_hints(self):
_positional_arg_hints(['x', 'y'], {'x': int}))

def test_getcallargs_forhints(self):
func = lambda a, (b, c), *d: None
def func(a, b_c, *d):
b, c = b_c # pylint: disable=unused-variable
return None
self.assertEquals(
{'a': Any, 'b': Any, 'c': Any, 'd': Tuple[Any, ...]},
{'a': Any, 'b_c': Any, 'd': Tuple[Any, ...]},
getcallargs_forhints(func, *[Any, Any]))
self.assertEquals(
{'a': Any, 'b': Any, 'c': Any, 'd': Tuple[Any, ...]},
{'a': Any, 'b_c': Any, 'd': Tuple[Any, ...]},
getcallargs_forhints(func, *[Any, Any, Any, int]))
self.assertEquals(
{'a': int, 'b': str, 'c': Any, 'd': Tuple[Any, ...]},
{'a': int, 'b_c': Tuple[str, Any], 'd': Tuple[Any, ...]},
getcallargs_forhints(func, *[int, Tuple[str, Any]]))


Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/utils/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
needed right now use a @retry.no_retries decorator.
"""


import logging
import random
import sys
Expand Down Expand Up @@ -187,7 +188,7 @@ def wrapper(*args, **kwargs):
sleep_interval = next(retry_intervals)
except StopIteration:
# Re-raise the original exception since we finished the retries.
six.reraise(exn, None, exn_traceback) # pylint: disable=raising-bad-type
six.raise_from(exn, exn_traceback)

logger(
'Retry with exponential backoff: waiting for %s seconds before '
Expand Down
40 changes: 40 additions & 0 deletions sdks/python/run_mini_py3lint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This script will run python3 ready style checks
#
# Currently only flake8 E999
#
# The exit-code of the script indicates success or a failure.

set -o errexit
set -o pipefail

MODULE=apache_beam

usage(){ echo "Usage: $0 [MODULE|--help] # The default MODULE is $MODULE"; }

if test $# -gt 0; then
case "$@" in
--help) usage; exit 1;;
*) MODULE="$*";;
esac
fi

echo "Running flake8 for module $MODULE:"
flake8 $MODULE --count --select=E999 --show-source --statistics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a separate file? I see the we are running the same thing in run_pylint.sh already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So many of the current linters don't pass when run in a Py3 env. We can copy them over 1 and a time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Optional: We don't need mini in the name.

4 changes: 4 additions & 0 deletions sdks/python/run_pylint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,20 @@ echo "Running pylint for module $MODULE:"
pylint -j8 "$MODULE" --ignore-patterns="$FILES_TO_IGNORE"
echo "Running pycodestyle for module $MODULE:"
pycodestyle "$MODULE" --exclude="$FILES_TO_IGNORE"
echo "Running flake8 for module $MODULE:"
flake8 $MODULE --count --select=E999 --show-source --statistics
echo "Running isort for module $MODULE:"
# Skip files where isort is behaving weirdly
ISORT_EXCLUDED=(
"apiclient.py"
"avroio_test.py"
"datastore_wordcount.py"
"datastoreio_test.py"
"hadoopfilesystem.py"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need exceptions for a growing list of files?

  • @ehudm specifically for this file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw this (my username is udim). Was wondering why this file was added to this list?

"iobase_test.py"
"fast_coders_test.py"
"slow_coders_test.py"
"vcfio.py"
)
SKIP_PARAM=""
for file in "${ISORT_EXCLUDED[@]}"; do
Expand Down
21 changes: 19 additions & 2 deletions sdks/python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

[tox]
# new environments will be excluded by default unless explicitly added to envlist.
envlist = py27,py27gcp,py27cython,lint,docs
# TODO (after BEAM-3671) add lint_py3 back in.
envlist = py27,py27gcp,py27cython,lint_py2,docs
toxworkdir = {toxinidir}/target/.tox

[pycodestyle]
Expand Down Expand Up @@ -90,13 +91,14 @@ commands =
python setup.py test
passenv = TRAVIS*

[testenv:lint]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the name changed?
Please revert or update https://beam.apache.org/contribute/contribution-guide/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll ping you on the PR for that.

[testenv:lint_py2]
deps=
nose==1.3.7
pycodestyle==2.3.1
pylint==1.7.2
future==0.16.0
isort==4.2.15
flake8==3.5.0
whitelist_externals=time
commands =
python --version
Expand All @@ -105,6 +107,21 @@ commands =
time {toxinidir}/run_pylint.sh
passenv = TRAVIS*

[testenv:lint_py3]
deps=
nose==1.3.7
pycodestyle==2.3.1
pylint==1.7.2
future==0.16.0
isort==4.2.15
flake8==3.5.0
whitelist_externals=time
commands =
time pip install -e .[test]
time {toxinidir}/run_mini_py3lint.sh
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need two lint environments? We run this command in the single lint environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a new one for Python3 linting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear flake8 only catches the E999 issues when run in Python3

passenv = TRAVIS*


[testenv:docs]
deps=
nose==1.3.7
Expand Down