From 95185780309c33ae361a5ee53e9f253036a62951 Mon Sep 17 00:00:00 2001 From: Ashwin Ramaswami Date: Sun, 6 Sep 2020 17:56:18 -0400 Subject: [PATCH 1/7] Support for NestedValueProvider for Python SDK --- .../apache_beam/options/value_provider.py | 58 ++++++++++++++++++- .../options/value_provider_test.py | 39 +++++++++++++ 2 files changed, 94 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/options/value_provider.py b/sdks/python/apache_beam/options/value_provider.py index fde61c85db77..d460720a2372 100644 --- a/sdks/python/apache_beam/options/value_provider.py +++ b/sdks/python/apache_beam/options/value_provider.py @@ -15,8 +15,9 @@ # limitations under the License. # -"""A ValueProvider class to implement templates with both statically -and dynamically provided values. +"""A ValueProvider abstracts the notion of fetching a value that may or may not be currently available. + +This can be used to parameterize transforms that only read values in at runtime, for example. """ # pytype: skip-file @@ -33,22 +34,34 @@ 'ValueProvider', 'StaticValueProvider', 'RuntimeValueProvider', + 'NestedValueProvider', 'check_accessible', ] class ValueProvider(object): + """Base class that all other ValueProviders must implement. + """ def is_accessible(self): + """Whether the contents of this ValueProvider is available to routines that run at graph construction time.""" raise NotImplementedError( 'ValueProvider.is_accessible implemented in derived classes') def get(self): + """Return the value wrapped by this ValueProvider. + """ raise NotImplementedError( 'ValueProvider.get implemented in derived classes') class StaticValueProvider(ValueProvider): + """StaticValueProvider is an implementation of ValueProvider that allows for a static value to be provided.""" def __init__(self, value_type, value): + """ + Args: + value_type: Type of the static value + value: Static value + """ self.value_type = value_type self.value = value_type(value) @@ -78,6 +91,8 @@ def __hash__(self): class RuntimeValueProvider(ValueProvider): + """RuntimeValueProvider is an implementation of ValueProvider that allows for a value to be provided at execution time rather than at graph construction time. + """ runtime_options = None experiments = set() # type: Set[str] @@ -122,8 +137,45 @@ def __str__(self): repr(self.default_value)) +class NestedValueProvider(ValueProvider): + """NestedValueProvider is an implementation of ValueProvider that allows for wrapping another ValueProvider object.""" + def __init__(self, value, translator): + """Creates a NestedValueProvider that wraps the provided ValueProvider. + + Args: + value: ValueProvider object to wrap + translator: function that is applied to the ValueProvider + Raises: + RuntimeValueProviderError: if any of the provided objects are not accessible + """ + self.value = value + self.translator = translator + + def is_accessible(self): + return self.value.is_accessible() + + def get(self): + try: + return self.cached_value + except AttributeError: + self.cached_value = self.translator(self.value.get()) + return self.cached_value + + def __str__(self): + return "%s(value: %s, translator: %s)" % ( + self.__class__.__name__, + self.value, + self.translator.__name__, + ) + + def check_accessible(value_provider_list): - """Check accessibility of a list of ValueProvider objects.""" + """A decorator that checks accessibility of a list of ValueProvider objects. + + Args: + value_provider_list: list of ValueProvider objects + Raises: + RuntimeValueProviderError: if any of the provided objects are not accessible.""" assert isinstance(value_provider_list, list) def _check_accessible(fnc): diff --git a/sdks/python/apache_beam/options/value_provider_test.py b/sdks/python/apache_beam/options/value_provider_test.py index 52525607cb8e..33696283d24e 100644 --- a/sdks/python/apache_beam/options/value_provider_test.py +++ b/sdks/python/apache_beam/options/value_provider_test.py @@ -23,11 +23,13 @@ import logging import unittest +from mock import Mock from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.options.value_provider import StaticValueProvider +from apache_beam.options.value_provider import NestedValueProvider # TODO(BEAM-1319): Require unique names only within a test. @@ -218,6 +220,43 @@ def test_experiments_options_setup(self): self.assertIn('b,c', options.experiments) self.assertNotIn('c', options.experiments) + def test_nested_value_provider_wrap_static(self): + vp = NestedValueProvider(StaticValueProvider(int, 1), lambda x: x + 1) + + self.assertTrue(vp.is_accessible()) + self.assertEqual(vp.get(), 2) + + def test_nested_value_provider_caches_value(self): + mock_fn = Mock() + + def translator(x): + mock_fn() + return x + + vp = NestedValueProvider(StaticValueProvider(int, 1), lambda x: x + 1) + + vp.get_value() + self.assertEqual(mock_fn.call_count, 1) + vp.get_value() + self.assertEqual(mock_fn.call_count, 1) + + def test_nested_value_provider_wrap_runtime(self): + class UserDefinedOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_value_provider_argument( + '--vpt_vp_arg15', + help='This keyword argument is a value provider') # set at runtime + + options = UserDefinedOptions([]) + vp = NestedValueProvider(options.vpt_vp_arg15, lambda x: x + x) + self.assertFalse(vp.is_accessible()) + + RuntimeValueProvider.set_runtime_options({'vpt_vp_arg15': 'abc'}) + + self.assertTrue(vp.is_accessible()) + self.assertEqual(vp.get(), 'abcabc') + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 3aa448bf7ab7b47b66c24751739745b449c07de1 Mon Sep 17 00:00:00 2001 From: Ashwin Ramaswami Date: Sun, 6 Sep 2020 21:15:13 -0400 Subject: [PATCH 2/7] Fix typo --- sdks/python/apache_beam/options/value_provider_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/options/value_provider_test.py b/sdks/python/apache_beam/options/value_provider_test.py index 33696283d24e..9305aa045831 100644 --- a/sdks/python/apache_beam/options/value_provider_test.py +++ b/sdks/python/apache_beam/options/value_provider_test.py @@ -235,9 +235,9 @@ def translator(x): vp = NestedValueProvider(StaticValueProvider(int, 1), lambda x: x + 1) - vp.get_value() + vp.get() self.assertEqual(mock_fn.call_count, 1) - vp.get_value() + vp.get() self.assertEqual(mock_fn.call_count, 1) def test_nested_value_provider_wrap_runtime(self): From f1232bd7b140b3194119fc087b7333e7c015c89d Mon Sep 17 00:00:00 2001 From: Ashwin Ramaswami Date: Sun, 6 Sep 2020 21:18:51 -0400 Subject: [PATCH 3/7] Update CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 80204526a310..8d45e157dd79 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,6 +67,7 @@ More details will be in an upcoming [blog post](https://beam.apache.org/blog/python-performance-runtime-type-checking/index.html). * Added support for Python 3 type annotations on PTransforms using typed PCollections ([BEAM-10258](https://issues.apache.org/jira/browse/BEAM-10258)). More details will be in an upcoming [blog post](https://beam.apache.org/blog/python-improved-annotations/index.html). +* Added support for NestedValueProvider for the Python SDK ([BEAM-10856](https://issues.apache.org/jira/browse/BEAM-10856)). * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). ## Breaking Changes From 2d8f3f3b4a57ab52ff2bad382800ad6fb997cf63 Mon Sep 17 00:00:00 2001 From: Ashwin Ramaswami Date: Sun, 6 Sep 2020 22:00:09 -0400 Subject: [PATCH 4/7] Update value_provider_test.py --- sdks/python/apache_beam/options/value_provider_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/options/value_provider_test.py b/sdks/python/apache_beam/options/value_provider_test.py index 9305aa045831..19be77f8a6fa 100644 --- a/sdks/python/apache_beam/options/value_provider_test.py +++ b/sdks/python/apache_beam/options/value_provider_test.py @@ -233,7 +233,7 @@ def translator(x): mock_fn() return x - vp = NestedValueProvider(StaticValueProvider(int, 1), lambda x: x + 1) + vp = NestedValueProvider(StaticValueProvider(int, 1), translator) vp.get() self.assertEqual(mock_fn.call_count, 1) From 006a79fa1535e0e7fbd222ac0dd3e9edbdd61b27 Mon Sep 17 00:00:00 2001 From: Eugene Nikolaiev Date: Sun, 15 Nov 2020 21:12:53 +0200 Subject: [PATCH 5/7] Fix NestedValueProvider docstrings. (#1) --- .../apache_beam/options/value_provider.py | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/options/value_provider.py b/sdks/python/apache_beam/options/value_provider.py index d460720a2372..79987c396360 100644 --- a/sdks/python/apache_beam/options/value_provider.py +++ b/sdks/python/apache_beam/options/value_provider.py @@ -15,9 +15,11 @@ # limitations under the License. # -"""A ValueProvider abstracts the notion of fetching a value that may or may not be currently available. +"""A ValueProvider abstracts the notion of fetching a value that may or +may not be currently available. -This can be used to parameterize transforms that only read values in at runtime, for example. +This can be used to parameterize transforms that only read values in at +runtime, for example. """ # pytype: skip-file @@ -43,7 +45,9 @@ class ValueProvider(object): """Base class that all other ValueProviders must implement. """ def is_accessible(self): - """Whether the contents of this ValueProvider is available to routines that run at graph construction time.""" + """Whether the contents of this ValueProvider is available to routines + that run at graph construction time. + """ raise NotImplementedError( 'ValueProvider.is_accessible implemented in derived classes') @@ -55,7 +59,9 @@ def get(self): class StaticValueProvider(ValueProvider): - """StaticValueProvider is an implementation of ValueProvider that allows for a static value to be provided.""" + """StaticValueProvider is an implementation of ValueProvider that allows + for a static value to be provided. + """ def __init__(self, value_type, value): """ Args: @@ -91,7 +97,9 @@ def __hash__(self): class RuntimeValueProvider(ValueProvider): - """RuntimeValueProvider is an implementation of ValueProvider that allows for a value to be provided at execution time rather than at graph construction time. + """RuntimeValueProvider is an implementation of ValueProvider that + allows for a value to be provided at execution time rather than + at graph construction time. """ runtime_options = None experiments = set() # type: Set[str] @@ -138,7 +146,9 @@ def __str__(self): class NestedValueProvider(ValueProvider): - """NestedValueProvider is an implementation of ValueProvider that allows for wrapping another ValueProvider object.""" + """NestedValueProvider is an implementation of ValueProvider that allows + for wrapping another ValueProvider object. + """ def __init__(self, value, translator): """Creates a NestedValueProvider that wraps the provided ValueProvider. @@ -146,7 +156,8 @@ def __init__(self, value, translator): value: ValueProvider object to wrap translator: function that is applied to the ValueProvider Raises: - RuntimeValueProviderError: if any of the provided objects are not accessible + RuntimeValueProviderError: if any of the provided objects are not + accessible. """ self.value = value self.translator = translator @@ -171,11 +182,13 @@ def __str__(self): def check_accessible(value_provider_list): """A decorator that checks accessibility of a list of ValueProvider objects. - + Args: value_provider_list: list of ValueProvider objects Raises: - RuntimeValueProviderError: if any of the provided objects are not accessible.""" + RuntimeValueProviderError: if any of the provided objects are not + accessible. + """ assert isinstance(value_provider_list, list) def _check_accessible(fnc): From c455e466b3de5d42db7d8ea6a65378defd4666ec Mon Sep 17 00:00:00 2001 From: Eugene Nikolaiev Date: Fri, 20 Nov 2020 21:13:45 +0200 Subject: [PATCH 6/7] Fix isort and doc errors. (#2) --- sdks/python/apache_beam/options/value_provider.py | 4 ++-- sdks/python/apache_beam/options/value_provider_test.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/options/value_provider.py b/sdks/python/apache_beam/options/value_provider.py index 79987c396360..0fa5f2b5f157 100644 --- a/sdks/python/apache_beam/options/value_provider.py +++ b/sdks/python/apache_beam/options/value_provider.py @@ -156,7 +156,7 @@ def __init__(self, value, translator): value: ValueProvider object to wrap translator: function that is applied to the ValueProvider Raises: - RuntimeValueProviderError: if any of the provided objects are not + ``RuntimeValueProviderError``: if any of the provided objects are not accessible. """ self.value = value @@ -186,7 +186,7 @@ def check_accessible(value_provider_list): Args: value_provider_list: list of ValueProvider objects Raises: - RuntimeValueProviderError: if any of the provided objects are not + ``RuntimeValueProviderError``: if any of the provided objects are not accessible. """ assert isinstance(value_provider_list, list) diff --git a/sdks/python/apache_beam/options/value_provider_test.py b/sdks/python/apache_beam/options/value_provider_test.py index 19be77f8a6fa..189501bb9eed 100644 --- a/sdks/python/apache_beam/options/value_provider_test.py +++ b/sdks/python/apache_beam/options/value_provider_test.py @@ -23,13 +23,14 @@ import logging import unittest + from mock import Mock from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.value_provider import NestedValueProvider from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.options.value_provider import StaticValueProvider -from apache_beam.options.value_provider import NestedValueProvider # TODO(BEAM-1319): Require unique names only within a test. From a314678a101158222d9fe1c19517aabd23ec2db0 Mon Sep 17 00:00:00 2001 From: Ashwin Ramaswami Date: Fri, 20 Nov 2020 18:10:53 -0500 Subject: [PATCH 7/7] Update CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 3498c39ee1bb..ece103c21318 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -95,6 +95,7 @@ * Added support for avro payload format in Beam SQL Pubsub Table ([BEAM-5504](https://issues.apache.org/jira/browse/BEAM-5504)) * Added option to disable unnecessary copying between operators in Flink Runner (Java) ([BEAM-11146](https://issues.apache.org/jira/browse/BEAM-11146)) * Added CombineFn.setup and CombineFn.teardown to Python SDK. These methods let you initialize the CombineFn's state before any of the other methods of the CombineFn is executed and clean that state up later on. If you are using Dataflow, you need to enable Dataflow Runner V2 by passing `--experiments=use_runner_v2` before using this feature. ([BEAM-3736](https://issues.apache.org/jira/browse/BEAM-3736)) +* Added support for NestedValueProvider for the Python SDK ([BEAM-10856](https://issues.apache.org/jira/browse/BEAM-10856)). * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). ## Breaking Changes @@ -136,7 +137,6 @@ More details will be in an upcoming [blog post](https://beam.apache.org/blog/python-performance-runtime-type-checking/index.html). * Added support for Python 3 type annotations on PTransforms using typed PCollections ([BEAM-10258](https://issues.apache.org/jira/browse/BEAM-10258)). More details will be in an upcoming [blog post](https://beam.apache.org/blog/python-improved-annotations/index.html). -* Added support for NestedValueProvider for the Python SDK ([BEAM-10856](https://issues.apache.org/jira/browse/BEAM-10856)). * Improved the Interactive Beam API where recording streaming jobs now start a long running background recording job. Running ib.show() or ib.collect() samples from the recording ([BEAM-10603](https://issues.apache.org/jira/browse/BEAM-10603)). * In Interactive Beam, ib.show() and ib.collect() now have "n" and "duration" as parameters. These mean read only up to "n" elements and up to "duration" seconds of data read from the recording ([BEAM-10603](https://issues.apache.org/jira/browse/BEAM-10603)). * Initial preview of [Dataframes](https://s.apache.org/simpler-python-pipelines-2020#slide=id.g905ac9257b_1_21) support.