-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-4003] Futurize runners subpackage #5373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Run Python Dataflow ValidatesRunner |
tvalentyn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @Fematich . A few comments below, also we need to rebase this change based on current head.
| args, kwargs = transform.raw_side_inputs | ||
| args_to_check = itertools.chain(args, | ||
| kwargs.values()) | ||
| list(kwargs.values())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While technically this makes equivalent interpretation in Py2 and Py3, I think we can safely skip list conversion here in favor of cleaner code.
| views_string = (', '.join(str(elm) for elm in self._views.values()) | ||
| if self._views.values() else '[]') | ||
| views_string = (', '.join(str(elm) for elm in list(self._views.values())) | ||
| if list(self._views.values()) else '[]') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While technically this makes equivalent interpretation in Py2 and Py3, I think we can safely skip list conversion here in favor of cleaner code.
| bundles = [] | ||
| bundle = None | ||
| for encoded_k, vs in self.gbk_items.iteritems(): | ||
| for encoded_k, vs in self.gbk_items.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we learned in #5053, Cython can translate iteration through items() without an intermediate object given a directive to interpret the code using Python3 semantics. However when the code is not cythonized, I would prefer to use future.util.iteritems() avoid impact on efficiency for Python 2. It's probably ok to use items() in tests, or when it's clear that the collection is small in size.
| return False | ||
| if self.context != IGNORED: | ||
| for key, name in self.context.iteritems(): | ||
| for key, name in self.context.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use future.utils.iteritems() here.
| # Now we create the MetricResult elements. | ||
| result = [] | ||
| for metric_key, metric in metrics_by_name.iteritems(): | ||
| for metric_key, metric in metrics_by_name.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use future.utils.iteritems() here.
|
|
||
| """Python worker logging.""" | ||
|
|
||
| from __future__ import absolute_import |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add # cython: language_level=3, since this file will be cythonized.
| from builtins import object | ||
| from builtins import zip | ||
|
|
||
| import six |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should we avoid import six for consistency with the approach followed elsewhere.
What do you think, @RobbeSneyders ?
Looks like we are using six.reraise in a few places and six.text_type in apiclient.py.
| """Counters collect the progress of the Worker for reporting to the service.""" | ||
|
|
||
| from __future__ import absolute_import | ||
| from __future__ import division |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add # cython: language_level=3, since this file will be cythonized.
| @@ -22,8 +22,13 @@ | |||
| For internal use only; no backwards-compatibility guarantees. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this file will be cythonized, let's tell cython to use Python3 semantics and add:
# cython: language_level=3,
See also:
Line 168 in 7c3fba0
| 'apache_beam/**/*.pyx', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has some interesting consequences for typechecks on strings.
if not isinstance(tag, six.string_types):cannot be replaced by
if not isinstance(tag, (str, unicode)):Since the string objects defined in other modules (in Python2) are seen as bytes in Cython code with Python3 semantics.
Therefore I replaced it by
try:
basestring
except NameError:
basestring = strand
if not isinstance(tag,basestring):| i, buckets[i], | ||
| 10 * total_runs / i, | ||
| buckets[i] / (10.0 * total_runs / i))) | ||
| buckets[i] // (10.0 * total_runs / i))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
|
@tvalentyn and @charlesccychen : all the comments have been addressed and the PR has been rebased now |
| _worker_id = os.environ['WORKER_ID'] if os.environ.has_key( | ||
| 'WORKER_ID') else str(uuid.uuid4()) | ||
| _worker_id = os.environ['WORKER_ID'] if 'WORKER_ID' in os.environ else \ | ||
| str(uuid.uuid4()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OUCH... Backshash is a bad idea (see PEP8). One space character to the right of the backslach and the script breaks on a change that is not visible to the reader. What about:
- _worker_id = os.environ.get('WORKER_ID', str(uuid.uuid4())) instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
cclauss
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are all the absolute_imports really essential?
charlesccychen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
| _worker_id = os.environ['WORKER_ID'] if os.environ.has_key( | ||
| 'WORKER_ID') else str(uuid.uuid4()) | ||
| _worker_id = os.environ['WORKER_ID'] if 'WORKER_ID' in os.environ else \ | ||
| str(uuid.uuid4()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
| from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler | ||
| from apache_beam.runners.worker.sdk_worker import SdkHarness | ||
|
|
||
| standard_library.install_aliases() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to call this before we do the import of "http.server" above? It looks like this only works because of an "accident" in that this module is imported after someone else already called install_aliases().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Order should be OK, see: #5373 (comment)
| from apache_beam.runners.worker import data_plane | ||
| from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor | ||
|
|
||
| standard_library.install_aliases() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to call this before we do the import of "queue" above? It looks like this only works because of an "accident" in that this module is imported after someone else already called install_aliases().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Order should be OK, see: #5373 (comment)
|
|
||
| from apache_beam.runners.worker import logger | ||
|
|
||
| standard_library.install_aliases() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed in this module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed now!
| from apache_beam.portability.api import beam_fn_api_pb2_grpc | ||
| from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor | ||
|
|
||
| standard_library.install_aliases() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to call this before we do the import of "queue" above? It looks like this only works because of an "accident" in that this module is imported after someone else already called install_aliases().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Order should be OK, see: #5373 (comment)
| from apache_beam.transforms import sideinputs | ||
| from apache_beam.utils import counters | ||
|
|
||
| standard_library.install_aliases() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to call this before we do the import of "queue" above? It looks like this only works because of an "accident" in that this module is imported after someone else already called install_aliases().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pylint errors on placing install_aliases() above the imports. That's the reasoning for this location.
I tested functionality of this placement and this order works as well:
test script* (Python2):
Without install_aliases():
>>> import sys
>>> sys.intern
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'module' object has no attribute 'intern'
With install_aliases() after import sys
>>> import sys
>>> from future import standard_library
>>> standard_library.install_aliases()
>>> sys.intern
<built-in function intern>
*Based on documented effect of aliased imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
from future.moves.queue import Queue
| return False | ||
| if self.context != IGNORED: | ||
| for key, name in self.context.iteritems(): | ||
| for key, name in iteritems(self.context): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why iteritems here vs .items()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid efficiency losses in Python2, see: #5373 (comment)
| import time | ||
| from datetime import datetime | ||
| from StringIO import StringIO | ||
| from io import BytesIO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some of your other pending changes you use io.BytesIO. Can we choose one style and be consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any guidelines on the style? If not, I'll make it consistent with the io.BytesIO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think people usually use it in a qualified way, so +1 for io.BytesIO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check!
| from apache_beam.utils import proto_utils | ||
| from apache_beam.utils.plugin import BeamPlugin | ||
|
|
||
| standard_library.install_aliases() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduced by futurize step because of import urllib (documentation). However, since we import urllib from future, this is indeed not required anymore
| from apitools.base.py import exceptions | ||
| import six | ||
|
|
||
| from future import standard_library |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to call this before we do the import of "io" above? It looks like this only works because of an "accident" in that this module is imported after someone else already called install_aliases().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not believe that you can just write from future import standard_library. I believe that you must write from future.standard_library import install_aliases ; install_aliases() and (despite what the linters will tell you) the install_aliases() call must happen before you import effected modules. If I am wrong on this then please enlighten me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed in other PR, io has been added in Python 2.6, so we don't need to install_aliases to import it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I'll remove the install_aliases caused by io. However regarding the order, it seems that install_aliases doesn't need to be called before the imports (copied from here):
I tested functionality of this placement and this order works as well:
test script* (Python2):
Without install_aliases():
>>> sys.intern
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'module' object has no attribute 'intern'
With install_aliases() after import sys
>>> from future import standard_library
>>> standard_library.install_aliases()
>>> sys.intern
<built-in function intern>
*Based on documented effect of aliased imports
|
Thank you! LGTM. (ccy-benchmark-ok) |
This pull request prepares the runners subpackage for Python 3 support. This PR is part of a series in which all subpackages will be updated using the same approach.
This approach has been documented here and the first pull request in the series (Futurize coders subpackage) demonstrating this approach can be found at #5053.
R: @aaltay @tvalentyn @RobbeSneyders