-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-4006] Futurize transforms subpackage #5729
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
superbobry
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General question: does pre-commit on Jenkins run whitelisted unittests on Python 3?
| import itertools | ||
| import random | ||
| import unittest | ||
| from builtins import range |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from future.builtins import range?
| avg_size_per_value = self._total_size // len(self._serialized_values) | ||
| num_values_per_split = max( | ||
| int(desired_bundle_size / avg_size_per_value), 1) | ||
| int(desired_bundle_size // avg_size_per_value), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for an int call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to coerce it into an int; e.g. 4 // 7.0 has value 0.0 but type float.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I didn't realise that avg_size_per_value could be a float.
|
|
||
| def known_windows(self): | ||
| return self.window_ids.keys() | ||
| return list(self.window_ids.keys()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
list(self.window_ids)?
| return NotImplemented | ||
|
|
||
| def __hash__(self): | ||
| return hash(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will recurse infinitely, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly thx! Fixed now with: return hash(tuple(self))
|
|
||
| def _extract_input_pvalues(self, pvalueish): | ||
| try: | ||
| # If this works, it's a dict. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of scope for this PR, but I'm curious, why not just do an isintance check?
|
|
||
| def __eq__(self, other): | ||
| raise NotImplementedError | ||
| return self.cmp(other) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a total_ordering backport as in TimestampedValue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By only implementing 2 methods (as in the example of TimestampedValue),
test_sessions_after_all (apache_beam.transforms.trigger_test.TriggerTest) didn't run
| def __hash__(self): | ||
| return hash(self) | ||
|
|
||
| # def __lt__(self, other): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover code?
|
|
||
| def __hash__(self): | ||
| return hash(self.end) | ||
| return hash(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will recurse infinitely as well.
|
|
||
| def __cmp__(self, other): | ||
| def __eq__(self, other): | ||
| return (type(self) == type(other)) and (self.value == other.value) and \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: parens are not required here.
| if type(self) is not type(other): | ||
| return cmp(type(self), type(other)) | ||
| return cmp((self.value, self.timestamp), (other.value, other.timestamp)) | ||
| return type(self) < type(other) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fear this might fail since types are not comparable in Python 3 unless you use a custom metaclass (which is not the case here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed! Using the hash of type should resolve this issue.
charlesccychen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
| self.assertEquals( | ||
| set(['from_dictionary', 'get_all_options', 'slices', 'style', | ||
| 'view_as', 'display_data']), | ||
| 'view_as', 'display_data', 'next']), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine, but was there a particular reason it was added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because of the import from builtins import object in apache_beam/transforms/display.py.
This import adds an alias:
next = __next__ for Python2 and Python3 compatibility.
PipelineOptions (the tested class in this test) inherits from HasDisplayData class defined in the display.py module.
| def match(actual): | ||
| equal_to([1])([len(actual)]) | ||
| equal_to(pairs)(actual[0].iteritems()) | ||
| equal_to(pairs)(iteritems(actual[0])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just use .items() here.
| """ | ||
| super(Create, self).__init__() | ||
| if isinstance(value, string_types): | ||
| if isinstance(value, (unicode, str)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add bytes here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the requirements of value, I think we should actually check if value is an iterable:
hasattr(values,'__iter__') which also fails for string_types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention of this line is to prohibit string types from being returned where we expect an iterable of items. Strings technically are iterable (and return individual characters), so we want to prevent them from being returned accidentally (e.g., a user may intend to return a single string, but we don't want to interpret it as its individual characters). In both Python 2 and Python 3, string types are iterable, so I think we should add bytes to this list of "blacklisted" return types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added bytes in the list!
Strings in Python2 didn't have a __iter__() method, the for loop functionality was provided by the __getitem__() method. That's the reason I suggested to use the hasattr(values,'__iter__') attribute check. However, in Python3 __iter__() is available for Strings as well, so the check wouldn't work for it.
| avg_size_per_value = self._total_size // len(self._serialized_values) | ||
| num_values_per_split = max( | ||
| int(desired_bundle_size / avg_size_per_value), 1) | ||
| int(desired_bundle_size // avg_size_per_value), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to coerce it into an int; e.g. 4 // 7.0 has value 0.0 but type float.
| items = {k: (v if DisplayDataItem._get_value_type(v) is not None | ||
| else str(v)) | ||
| for k, v in pipeline_options.display_data().items()} | ||
| for k, v in iteritems(pipeline_options.display_data())} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually need to change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added for efficiency in Python2, but this will actually not be called that often, so I'll revert it.
|
|
||
| if (any([isinstance(v, pvalue.PCollection) for v in args]) or | ||
| any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])): | ||
| any([isinstance(v, pvalue.PCollection) for v in itervalues(kwargs)])): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just do .values()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit less optimal in Python2, but since it's only in the init, I'll change it to .values()
| equal_to([expected_elem])([actual_elem]) | ||
| equal_to(expected_list)(actual_list) | ||
| equal_to(expected_pairs)(actual_dict.iteritems()) | ||
| equal_to(expected_pairs)(iteritems(actual_dict)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just use .items()?
| equal_to([expected_elem])([actual_elem]) | ||
| equal_to(expected_kvs)(actual_dict1.iteritems()) | ||
| equal_to(expected_kvs)(actual_dict2.iteritems()) | ||
| equal_to(expected_kvs)(iteritems(actual_dict1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just use .items()?
| 'B-4': {6, 7, 8, 9}, | ||
| 'B-3': {10, 15, 16}, | ||
| }.iteritems())) | ||
| }))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just .items()?
|
|
||
| def __cmp__(self, other): | ||
| def __eq__(self, other): | ||
| return (type(self) == type(other)) and (self.value == other.value) and \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid backslash continuation.
|
Oops, it looks like there is something wrong with the commit history--a bunch of Java changes are being pulled in. Can you rebase to master, cherrypick and / or squash everything into one commit? |
a317e92 to
c386443
Compare
|
Yes, sorry for that! Should now be cleaned up :-). |
|
|
||
| def __eq__(self, other): | ||
| raise NotImplementedError | ||
| return self.cmp(other) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like this takes care of the case where other is not of type BoundedWindow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now equivalent again to the original code.
| type_eq = type(self) == type(other) | ||
| value_eq = self.value == other.value | ||
| timestamp_eq = self.timestamp == other.timestamp | ||
| return type_eq and value_eq and timestamp_eq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not work correctly, as the previous code relied on the short-circuiting behavior of "and". Accessing other.value will not work if the type is not as we expected.
|
Run Python PostCommit |
| """ | ||
|
|
||
| from __future__ import absolute_import | ||
| from __future__ import division |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change requires further changes in this file. In _BatchSizeEstimator._thin_data on line 285, we need to explicitly cast to int here; otherwise, we get the following error:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 175, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 85, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 403, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 404, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 577, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 602, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 575, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 352, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 85, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 403, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 404, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 577, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 618, in apache_beam.runners.common.DoFnRunner._reraise_augmented
six.reraise(type(new_exn), new_exn, original_traceback)
File "apache_beam/runners/common.py", line 575, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 352, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 651, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/util.py", line 345, in process
yield self._batch
File "/usr/lib/python2.7/contextlib.py", line 24, in __exit__
self.gen.next()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/util.py", line 268, in record_time
self._thin_data()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/util.py", line 287, in _thin_data
+ odd_one_out)
TypeError: slice indices must be integers or None or have an __index__ method [while running 'Analyze/RunPhase[0]/BatchAnalyzerInputs/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)']
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
treshold variable is now an int and Python PostCommit succeeds
9ed0884 to
f8aea37
Compare
|
Run Python PostCommit |
|
Unfortunately, this change is seen to produce a ~15% regression in internal Dataflow benchmarks. We have to investigate this regression before merging. CC: @tvalentyn |
| class ReiterableNonEmptyAccumulators(object): | ||
| def __iter__(self): | ||
| return itertools.ifilter(filter_fn, accumulators) | ||
| return filter(filter_fn, accumulators) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might be a potential source for the performance loss --> I'll update this to use ifilter on PY2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@charlesccychen and @tvalentyn: is there more detailed info on the benchmarks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. Let me test the pipeline with this change. Unfortunately it's not easy to export the benchmark data.
209f684 to
f6dc113
Compare
|
Unfortunately, the ifilter change here (#5729 (comment)) doesn't fix the regression. |
superbobry
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@charlesccychen could you share which subsystems are being benchmarked? The benchmark source code is not part of the beam repo, right?
| from apache_beam.utils import urns | ||
|
|
||
| try: | ||
| from itertools import ifilter as filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
from future.builtins import filter| timestamp_combiner = kwargs.pop('timestamp_combiner', None) | ||
| if kwargs: | ||
| raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) | ||
| raise ValueError('Unexpected keyword arguments: %s' % list(kwargs.keys())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just list(kwargs) will work too.
| """ | ||
| super(Create, self).__init__() | ||
| if isinstance(value, string_types): | ||
| if isinstance(value, (unicode, str, bytes)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check for both str and bytes since Python 2.7 defines bytes == str and on Python 3.X unicode == str.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #5729 (comment). Bytes in Python3 also shouldn't be allowed since we don't want to support creation of a PCollection of single bytes.
| from apache_beam.utils.timestamp import TIME_GRANULARITY | ||
|
|
||
| # AfterCount is experimental. No backwards compatibility guarantees. | ||
| try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be rewritten using future.moves.itertools.
| return self.end.predecessor() | ||
|
|
||
| def __cmp__(self, other): | ||
| def cmp(self, other): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using @total_ordering as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adds some extra delay
| min(self.start, other.start), max(self.end, other.end)) | ||
|
|
||
|
|
||
| @total_ordering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the slowdown is due to the indirection introduced by @total_ordering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is, working on a version without @total_ordering and the use of the comp function call now!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have confirmed so far by bisection that the slowdown is caused by some changes in util.py and/or window.py, and there are additional benchmark runs in flight to narrow this down further. It is very likely that the slowdown is caused by time it takes to compare objects of some of the classes defined in window.py due to changes in implementation cmp or hash functions . I also plan to confirm it with a microbenchmark similar to https://github.com/apache/beam/compare/master...tvalentyn:utils_futurization_benchmark?expand=1#diff-de123c6d83f9809a6f0d95be5a7d1826. That could help us to get performance metrics for different implementations without running a slow benchmark suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have pushed a new commit that should speed up the compare functions for BoundedWindow and TimestampedValue objects:
- removed the total_ordering decorator
- removed the custom cmp-method in
BoundedWindow - removed the if-statement in
TimestampedValueusing the short-circuit behavior ofor
All of these changes had a positive impact on a small test I used.
A microbenchmark in the apache beam code, would really be useful indeed :-)!
2e86095 to
c270644
Compare
|
I did not benchmark changes in c270644 yet, but doing bisection on previous version of the PR shows that the largest contributor to regression is the line |
|
@tvalentyn is there anything I can help with? Are you planning to benchmark the changes in c270644? Or do you first want to add more microbenchmarks? I am working on a microbenchmark for TimestampedValue #BEAM-4855 |
|
With latest round of experiments, we finally got to the bottom of this performance regression, see: https://issues.apache.org/jira/browse/BEAM-4858. I will also put some details inline in util.py. |
| key=div_keys) | ||
| # Keep the top 1/3 most different pairs, average the top 2/3 most similar. | ||
| threshold = 2 * len(pairs) / 3 | ||
| threshold = 2 * len(pairs) // 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use past.utils.division.old_div in line 280 as an exception, and add a TODO(BEAM-4858) comment to clean this up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have confirmed that this change brings performance back to the same ballpark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect! I will update the PR, thx!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your patience with this investigation.
|
@Fematich I'm taking a look at c270644. I don't believe Since we now know how to make a have Py3-compatible version of this change that performs comparably well, the rest of performance testing won't take much time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments pertaining to 5b8842b.
|
|
||
| def __hash__(self): | ||
| return hash(self.end) | ||
| raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason to change the original behavior of __hash__? Seems like we should revert this change since it makes the objects of this class unhashable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the implementation to match the __eq__ behavior which also raises NotImplemented. This to enforce the child classes to implement __hash__ and to make it impossible for child classes like GlobalWindow and IntervalWindow objects to have the same hash. Does this make sense, or should I add the __hash__ method back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, that makes sense, thanks, I think it's a good idea to keep returning NotImplementedError as you suggested, since we don't implement __eq__.
Note that if a class that overrides __eq__() needs to retain the implementation of __hash__() from a parent class, the interpreter must be told this explicitly by setting __hash__ = <ParentClass>.__hash__, see: https://docs.python.org/3/reference/datamodel.html#object.__hash__ .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK thanks, and interesting! So the NotImplementedError is only relevant for the consistency of the __eq__ but doesn't have an impact for child classes. Good to know :-)!
| def __hash__(self): | ||
| return hash((type(self), self.size, self.offset)) | ||
|
|
||
| def __ne__(self, other): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like classes IntervalWindow, GlobalWindow, SlidingWindows, and Sessions define __eq__, but don't define __ne__.
Let's add:
def __ne__(self, other):
return not self == other
since this would be the default implementation of __ne__ in Python 3.
Curious, why the conversion tool does not add something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding
def __ne__(self, other):
return not self == other
for IntervalWindow causes test_global_window to fail on the last assertion (comparing IntervalWindow (max-range) to GlobalWindow). To resolve this failed assertion I need to add type in __eq__:
def __hash__(self):
return hash((self.start, self.end, type(self)))
def __eq__(self, other):
return (self.start == other.start
and self.end == other.end)
and type(self) == type(other))
def __ne__(self, other):
return not self == other
I think this makes sense and will be necessary for Python3 compatibility, however I'm not sure if this will have performance impact here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with you, looking at the test it seems that we are doing the right thing. I think there will not be performance impact here, but I'll do one final A/B test with and without the PR to be safe once we finalize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I have just pushed the commit with these changes.
| return (cmp(self.end, other.end) | ||
| or cmp(hash(self), hash(other))) != 0 | ||
|
|
||
| def __lt__(self, other): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be a little faster if we don't pull in cmp. How about we implement the rich comparisons as follows:
# Order first by endpoint, then arbitrarily. <------ Let's mention this comment once.
def __op__(self, other):
if self.end != other.end:
return self.end $op_symbol other.end
return hash(self) $op_symbo hash(other)
| def __eq__(self, other): | ||
| raise NotImplementedError | ||
|
|
||
| def __ne__(self, other): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also remove cmp here:
def __ne__(self, other):
return self.end != other.end or hash(self) != hash(other)
| if type(self) is not type(other): | ||
| return cmp(type(self), type(other)) | ||
| return cmp((self.value, self.timestamp), (other.value, other.timestamp)) | ||
| def __eq__(self, other): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest:
- Use
@total_ordering. - Implement
__ne__asreturn not self == other - Implement
__lt__withoutcmpandtuples, which performs slightly better:
def __lt__(self, other):
if type(self) != type(other):
return type(self) < type(other)
if self.value != other.value:
return self.value < other.value
return self.timestamp < other.timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using total_ordering results in unexpected behavior. Concretely the test test_reshuffle_windows_unchanged fails.
I have tried to locate the exact cause by implementing all OPs (with the total_ordering decorator in place) and subsequently leaving out the OPs one by one:
- adding the
total_orderingdecorator itself doesn't introduce issues - only use
total_orderingto fill in__lt__works, using other combinations always fail. - I am currently testing the OPs by manually using the conversion rules defined by total_ordering to see if I can locate the exact problem:
__ge__can be removed and works with functools__gt__work from manual copy functools from lt, but not with functools__le__doesn't work from manual copy functools from lt, however replacingand self==otherbyand not (self != other)works.
@tvalentyn: Given the note on performance impact of the total_ordering decorator, it might make sense to implement all OPs instead of using the decorator? That works already, in the meantime I will continue the testing (step3) to see if I can give more info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this PR, the test becomes flaky, or in other words passes sometimes. It may still flake if we implement all ops manually - did you try running the test multiple times when all ops are implemented?
I don't understand yet what change in behavior triggers this (we should find out), but I think we need to fix the test regardless of this: #6104.
Performance-wise, last week I used master...tvalentyn:transforms_microbenchmark to compare different options, and did not notice a significant difference when using @total_ordering (we could double check), so I favored the decorator to reduce the boilerplate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I just retested with the full implementation which seems to work again. However, will be good to test the @total_ordering after your PR has been merged :-).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked performance of windowed_value, interval_window, timestamped_value, bounded_window in dictionaries and ordered lists, with and without this PR. For the most part, performance is not changed or improved. @total_ordering does not significantly affect it. Only concern is using hash(type(self)) when evaluating hashes of objects may be unnecessary in most cases, and slightly decreases the performance here: https://github.com/apache/beam/pull/5729/files#diff-d7dfd884622fb59806ba9276cf3bd8fbR242. So I left some more comments to simplify hash functions. The change above was also the trigger for test flakiness, although ultimately the test was at fault.
Without PR:
wv_with_one_window: dict, 10000 element(s) : per element median time cost: 4.71699e-06 sec, relative std: 5.93%
wv_with_multiple_windows: dict, 10000 element(s): per element median time cost: 4.02698e-05 sec, relative std: 0.60%
interval_window: dict, 10000 element(s) : per element median time cost: 1.5276e-06 sec, relative std: 1.78%
timestamped_value: dict, 10000 element(s) : per element median time cost: 1.39499e-07 sec, relative std: 7.44%
interval_window: sorting., 10000 element(s) : per element median time cost: 4.04392e-05 sec, relative std: 0.63%
timestamped_value: sorting., 10000 element(s) : per element median time cost: 1.80363e-05 sec, relative std: 1.35%
bounded_window: sorting., 10000 element(s) : per element median time cost: 4.06633e-05 sec, relative std: 1.26%
With PR (including the change suggested in last iteration).
wv_with_one_window: dict, 10000 element(s) : per element median time cost: 5.047e-06 sec, relative std: 2.16%
wv_with_multiple_windows: dict, 10000 element(s): per element median time cost: 4.0575e-05 sec, relative std: 2.20%
interval_window: dict, 10000 element(s) : per element median time cost: 1.53821e-06 sec, relative std: 2.43%
timestamped_value: dict, 10000 element(s) : per element median time cost: 1.27995e-06 sec, relative std: 6.11%
interval_window: sorting., 10000 element(s) : per element median time cost: 1.83087e-05 sec, relative std: 1.28%
timestamped_value: sorting., 10000 element(s) : per element median time cost: 8.4375e-06 sec, relative std: 2.62%
bounded_window: sorting., 10000 element(s) : per element median time cost: 1.80462e-05 sec, relative std: 3.56%
|
Run Python Dataflow ValidatesRunner |
tvalentyn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @Fematich. Did another pass over the PR, two minor comments.
| if self.sum >= INT64_MAX: | ||
| self.sum -= 2**64 | ||
| return self.sum / self.count if self.count else _NAN | ||
| return self.sum // self.count if self.count else _NAN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also make the change in line 266.
| def __ne__(self, other): | ||
| return not self == other | ||
|
|
||
| def __lt__(self, other): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since types are not comparable in Python 3, how about we change the implementation to:
def __lt__(self, other):
if type(self) != type(other):
return type(self).__name__ < type(other).__name__
if self.value != other.value:
return self.value < other.value
return self.timestamp < other.timestamp
|
|
||
| def __hash__(self): | ||
| return hash((self.start, self.end)) | ||
| return hash((self.start, self.end, type(self))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove type(self) from the tuple.
| return False | ||
|
|
||
| def __hash__(self): | ||
| return hash((type(self), self.param_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's simplify this to hash(self.param_id).
| return False | ||
|
|
||
| def __hash__(self): | ||
| return hash((type(self), self.windowfn, self.accumulation_mode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this was not defined, most likely this will be dead code, and current implementation may break the contract with __eq__ since it's not taking self._default into account, let's make it __hash__ = None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately removing __hash__ here, results in TypeError: unhashable type: 'Windowing' for test_top_prefixes.
However, looking at self._default, it seems OK to remove it from the hash, since it's actually a check on the other class variables. Therefore I think __eq__ and __hash__ are still in sync. I removed type(self) as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, self._default indeed is defined based on values of other object attributes included in hash.
Objects of this class are used in this dictionary:
| self._obj_to_id[obj] = id |
| return type(self) == type(other) and self.count == other.count | ||
|
|
||
| def __hash__(self): | ||
| return hash((type(self), self.count)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's simplify this to return hash(self.count).
| return type(self) == type(other) and self.underlying == other.underlying | ||
|
|
||
| def __hash__(self): | ||
| return hash((type(self), self.underlying)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove type(self) from the tuple.
| and self.timestamp == other.timestamp) | ||
|
|
||
| def __hash__(self): | ||
| return hash((type(self), self.value, self.timestamp)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove type(self) from the tuple.
| return self.size == other.size and self.offset == other.offset | ||
|
|
||
| def __hash__(self): | ||
| return hash((type(self), self.size, self.offset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove type(self) from the tuple.
| return not self == other | ||
|
|
||
| def __hash__(self): | ||
| return hash((type(self), self.offset, self.period)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove type(self) from the tuple.
| return not self == other | ||
|
|
||
| def __hash__(self): | ||
| return hash((type(self), self.gap_size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove type(self) from the tuple.
| if type(self) is not type(other): | ||
| return cmp(type(self), type(other)) | ||
| return cmp((self.value, self.timestamp), (other.value, other.timestamp)) | ||
| def __eq__(self, other): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked performance of windowed_value, interval_window, timestamped_value, bounded_window in dictionaries and ordered lists, with and without this PR. For the most part, performance is not changed or improved. @total_ordering does not significantly affect it. Only concern is using hash(type(self)) when evaluating hashes of objects may be unnecessary in most cases, and slightly decreases the performance here: https://github.com/apache/beam/pull/5729/files#diff-d7dfd884622fb59806ba9276cf3bd8fbR242. So I left some more comments to simplify hash functions. The change above was also the trigger for test flakiness, although ultimately the test was at fault.
Without PR:
wv_with_one_window: dict, 10000 element(s) : per element median time cost: 4.71699e-06 sec, relative std: 5.93%
wv_with_multiple_windows: dict, 10000 element(s): per element median time cost: 4.02698e-05 sec, relative std: 0.60%
interval_window: dict, 10000 element(s) : per element median time cost: 1.5276e-06 sec, relative std: 1.78%
timestamped_value: dict, 10000 element(s) : per element median time cost: 1.39499e-07 sec, relative std: 7.44%
interval_window: sorting., 10000 element(s) : per element median time cost: 4.04392e-05 sec, relative std: 0.63%
timestamped_value: sorting., 10000 element(s) : per element median time cost: 1.80363e-05 sec, relative std: 1.35%
bounded_window: sorting., 10000 element(s) : per element median time cost: 4.06633e-05 sec, relative std: 1.26%
With PR (including the change suggested in last iteration).
wv_with_one_window: dict, 10000 element(s) : per element median time cost: 5.047e-06 sec, relative std: 2.16%
wv_with_multiple_windows: dict, 10000 element(s): per element median time cost: 4.0575e-05 sec, relative std: 2.20%
interval_window: dict, 10000 element(s) : per element median time cost: 1.53821e-06 sec, relative std: 2.43%
timestamped_value: dict, 10000 element(s) : per element median time cost: 1.27995e-06 sec, relative std: 6.11%
interval_window: sorting., 10000 element(s) : per element median time cost: 1.83087e-05 sec, relative std: 1.28%
timestamped_value: sorting., 10000 element(s) : per element median time cost: 8.4375e-06 sec, relative std: 2.62%
bounded_window: sorting., 10000 element(s) : per element median time cost: 1.80462e-05 sec, relative std: 3.56%
tvalentyn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you.
|
Thank you @Fematich and all the reviewers! |
This pull request prepares the transforms subpackage for Python 3 support. This PR is part of a series in which all subpackages will be updated using the same approach.
This approach has been documented here and the first pull request in the series (Futurize coders subpackage) demonstrating this approach can be found at #5053.
R: @aaltay @tvalentyn @RobbeSneyders @charlesccychen