Skip to content

Conversation

@charlesccychen
Copy link
Contributor

@charlesccychen charlesccychen commented Jan 29, 2018

In the Python DirectRunner, we currently use apply_* overrides to override the operation of the default .expand() operation for certain transforms. For example, GroupByKey has a special implementation in the DirectRunner, so we use an apply_* override hook to replace the implementation of GroupByKey.expand().

However, this strategy has drawbacks. Because this override operation happens eagerly during graph construction, the pipeline graph is specialized and modified before a specific runner is bound to the pipeline's execution. This makes the pipeline graph non-portable and blocks full migration to using the Runner API pipeline representation in the DirectRunner.

By contrast, the SDK's PTransformOverride mechanism allows the expression of matchers that operate on the unspecialized graph, replacing PTransforms as necessary to produce a DirectRunner-specialized pipeline graph for execution.

We therefore want to replace these eager apply_* overrides with PTransformOverrides that operate on the completely constructed graph.

https://issues.apache.org/jira/browse/BEAM-3566

@charlesccychen
Copy link
Contributor Author

R: @robertwb

This is ready for review.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks! Just a couple of comments.


import hamcrest as hc

from apache_beam import Map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use import apache_beam as beam, and then beam.Map for consistency with the rest of the codebase.

overrides = _get_transform_overrides(p.options)
p.replace_all(overrides)

# Note that the direct output of ReadStringsFromPubSub will be replaced
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests seem rather brittle. Is there a better way to test this transform application than grabbing the internal source and verifying a couple of properties on it. https://beam.apache.org/documentation/pipelines/test-your-pipeline/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping on this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addressed this here: #4529 (comment)

ProcessKeyedElementsViaKeyedWorkItemsOverride()]

class CombinePerKeyOverride(PTransformOverride):
def get_matcher(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chamikaramj I'm curious why there isn't just a is_match() method on PTransformOverride. (Or, even simpler, perhpas get_replacement_transform would return None for non-matches.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this promotes developing simple (and possibly lightweight) matchers that might be shared between different overrides. We apply the matchers to every transform in the tree. get_replacement_transform() might be more heavy weight. This is inspired by the Java design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java (at least Java 7) didn't let you pass methods around as first class objects, thus forcing the design. Even with methods, sharding can happen (e.g. is_match = some_common_utility). We should make the common case the simple one (but probably not this PR). Could you file a bug?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug to replace get_matcher() by is_match() ? I think it's still good to keep is_match() and get_replacement_transform() separate to promote lightweight matching.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. (The advantage of not having separate methods is that you can get rid of the whole need for a class, and just let them be callables (again, not possible in Java), but I'm fine with is_match (or matches).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


overrides = [SplittableParDoOverride(),
ProcessKeyedElementsViaKeyedWorkItemsOverride(),
CombinePerKeyOverride(),]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: only leave trailing commas if the closing is on its own line.

from apache_beam.io.gcp import pubsub
except ImportError:
pass
if pubsub:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Factor this out into a method, and invoke it from the try block.

def curry_combine_fn(fn, args, kwargs):
if not args and not kwargs:
return fn
class _CurriedFn(core.CombineFn):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be a performance regression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we're calling fn(..., *args, **kwargs) anyways, this shouldn't matter. Please ignore.

return fn

return CurriedFn()
return _CurriedFn(fn, args, kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this in an else for symmetry.

Copy link
Contributor Author

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! PTAL.

overrides = _get_transform_overrides(p.options)
p.replace_all(overrides)

# Note that the direct output of ReadStringsFromPubSub will be replaced
Copy link
Contributor Author

@charlesccychen charlesccychen Feb 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
These tests seem rather brittle. Is there a better way to test this transform application than grabbing the internal source and verifying a couple of properties on it. https://beam.apache.org/documentation/pipelines/test-your-pipeline/

Thanks, and I agree, but the issue is that there really isn't anything to test.

The test here isn't a test of the transform; rather, it was (and still is) testing the behavior of replacing the transform with the correct DirectRunner replacement. The transform itself is just a wrapper with runner-specific overrides.

I changed the test name to reflect the intended target of the test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant https://beam.apache.org/contribute/ptransform-style-guide/#testing-transform-construction-and-validation

If this is about the direct runner, we should put it into the direct runner tests. Best is if we could create a mock/in memory PubSub and make sure this works end-to-end (on any runner).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, as mentioned, fixing these existing tests should not block this PR. Please file a JIRA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessKeyedElementsViaKeyedWorkItemsOverride()]

class CombinePerKeyOverride(PTransformOverride):
def get_matcher(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Yes. (The advantage of not having separate methods is that you can get rid of the whole need for a class, and just let them be callables (again, not possible in Java), but I'm fine with is_match (or matches).

Done.


import hamcrest as hc

from apache_beam import Map
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Use import apache_beam as beam, and then beam.Map for consistency with the rest of the codebase.

Done.


overrides = [SplittableParDoOverride(),
ProcessKeyedElementsViaKeyedWorkItemsOverride(),
CombinePerKeyOverride(),]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Nit: only leave trailing commas if the closing is on its own line.

Done.

def curry_combine_fn(fn, args, kwargs):
if not args and not kwargs:
return fn
class _CurriedFn(core.CombineFn):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Given that we're calling fn(..., *args, **kwargs) anyways, this shouldn't matter. Please ignore.

Acknowledged.

from apache_beam.io.gcp import pubsub
except ImportError:
pass
if pubsub:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Factor this out into a method, and invoke it from the try block.

Done.

return fn

return CurriedFn()
return _CurriedFn(fn, args, kwargs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Put this in an else for symmetry.

Done.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

overrides = _get_transform_overrides(p.options)
p.replace_all(overrides)

# Note that the direct output of ReadStringsFromPubSub will be replaced
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, as mentioned, fixing these existing tests should not block this PR. Please file a JIRA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants