From 90bd0095cbadff045e156592d3437d126255d900 Mon Sep 17 00:00:00 2001 From: Udi Meiri Date: Thu, 12 Sep 2019 18:42:42 -0700 Subject: [PATCH 1/3] [BEAM-8224] Fix bug in _fn_takes_side_inputs Bug was introduced in the conversion to inspect.signature, and manifests as assigning a wrapper function that accepts a single positional argument when the wrapped function can accept an arbitrary number. --- sdks/python/apache_beam/transforms/core.py | 4 +- .../typehints/typed_pipeline_test.py | 43 +++++++++++++++++-- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 5f68294972c6..4c8fd3184749 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -618,7 +618,9 @@ def _fn_takes_side_inputs(fn): # We can't tell; maybe it does. return True - return len(signature.parameters) > 1 + return (len(signature.parameters) > 1 or + any(p.kind == p.VAR_POSITIONAL or p.kind == p.VAR_KEYWORD + for p in signature.parameters.values())) class CallableWrapperDoFn(DoFn): diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 82c4a63c4225..be72cd8f4181 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -221,9 +221,46 @@ def repeat(s, *times): result = ['a', 'bb', 'c'] | beam.Map(repeat, 3) self.assertEqual(['aaa', 'bbbbbb', 'ccc'], sorted(result)) - # TODO(robertwb): Support partially defined varargs. - # with self.assertRaises(typehints.TypeCheckError): - # ['a', 'bb', 'c'] | beam.Map(repeat, 'z') + if sys.version_info >= (3,): + with self.assertRaisesRegexp( + typehints.TypeCheckError, + r'requires Tuple\[int, ...\] but got Tuple\[str, ...\]'): + ['a', 'bb', 'c'] | beam.Map(repeat, 'z') + + def test_var_positional_only_side_input_hint(self): + # Test that a lambda that accepts only a VAR_POSITIONAL can accept + # side-inputs. + result = (['a', 'b', 'c'] + | beam.Map(lambda *_: 'a', 5).with_input_types(str, int)) + self.assertEqual(['a', 'a', 'a'], sorted(result)) + + # Type hint order doesn't matter for VAR_POSITIONAL. + result = (['a', 'b', 'c'] + | beam.Map(lambda *_: 'a', 5).with_input_types(int, str)) + self.assertEqual(['a', 'a', 'a'], sorted(result)) + + if sys.version_info >= (3,): + with self.assertRaisesRegexp( + typehints.TypeCheckError, + r'requires Tuple\[Union\[int, str\], ...\] but got ' + r'Tuple\[Union\[float, int\], ...\]'): + _ = [1.2] | beam.Map(lambda *_: 'a', 5).with_input_types(int, str) + + def test_var_keyword_side_input_hint(self): + # Test that a lambda that accepts a VAR_KEYWORD can accept + # side-inputs. + result = (['a', 'b', 'c'] + | beam.Map(lambda e, **_: 'a', kw=5) + .with_input_types(str, ignored=int)) + self.assertEqual(['a', 'a', 'a'], sorted(result)) + + if sys.version_info >= (3,): + with self.assertRaisesRegexp( + typehints.TypeCheckError, + r'requires Dict\[str, str\] but got Dict\[str, int\]'): + _ = (['a', 'b', 'c'] + | beam.Map(lambda e, **_: 'a', kw=5) + .with_input_types(str, ignored=str)) def test_deferred_side_inputs(self): @typehints.with_input_types(str, int) From 074e456e05a582ed46ab0274e98938509f60760a Mon Sep 17 00:00:00 2001 From: Udi Meiri Date: Fri, 13 Sep 2019 16:45:55 -0700 Subject: [PATCH 2/3] Have lambdas return arguments and check them. --- .../apache_beam/typehints/typed_pipeline_test.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index be72cd8f4181..b35c223b4fe0 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -231,13 +231,13 @@ def test_var_positional_only_side_input_hint(self): # Test that a lambda that accepts only a VAR_POSITIONAL can accept # side-inputs. result = (['a', 'b', 'c'] - | beam.Map(lambda *_: 'a', 5).with_input_types(str, int)) - self.assertEqual(['a', 'a', 'a'], sorted(result)) + | beam.Map(lambda *args: args, 5).with_input_types(str, int)) + self.assertEqual([('a', 5), ('b', 5), ('c', 5)], sorted(result)) # Type hint order doesn't matter for VAR_POSITIONAL. result = (['a', 'b', 'c'] - | beam.Map(lambda *_: 'a', 5).with_input_types(int, str)) - self.assertEqual(['a', 'a', 'a'], sorted(result)) + | beam.Map(lambda *args: args, 5).with_input_types(int, str)) + self.assertEqual([('a', 5), ('b', 5), ('c', 5)], sorted(result)) if sys.version_info >= (3,): with self.assertRaisesRegexp( @@ -250,9 +250,10 @@ def test_var_keyword_side_input_hint(self): # Test that a lambda that accepts a VAR_KEYWORD can accept # side-inputs. result = (['a', 'b', 'c'] - | beam.Map(lambda e, **_: 'a', kw=5) + | beam.Map(lambda e, **kwargs: (e, kwargs), kw=5) .with_input_types(str, ignored=int)) - self.assertEqual(['a', 'a', 'a'], sorted(result)) + self.assertEqual([('a', {'kw': 5}), ('b', {'kw': 5}), ('c', {'kw': 5})], + sorted(result)) if sys.version_info >= (3,): with self.assertRaisesRegexp( From 234b5dea165c11d0680c07cd5b055e0492934217 Mon Sep 17 00:00:00 2001 From: Udi Meiri Date: Mon, 16 Sep 2019 16:50:58 -0700 Subject: [PATCH 3/3] Fix test_var_positional_only_side_input_hint Worked around an issue with trivial_inference not being passed side-input hints. --- sdks/python/apache_beam/transforms/core.py | 2 +- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 4c8fd3184749..6245bb7703fb 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -585,7 +585,7 @@ def default_type_hints(self): # TODO(sourabhbajaj): Do we want to remove the responsibility of these from # the DoFn or maybe the runner def infer_output_type(self, input_type): - # TODO(robertwb): Side inputs types. + # TODO(BEAM-8247): Side inputs types. # TODO(robertwb): Assert compatibility with input type hint? return self._strip_output_annotations( trivial_inference.infer_return_type(self.process, [input_type])) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index b35c223b4fe0..48129ecbec56 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -230,13 +230,18 @@ def repeat(s, *times): def test_var_positional_only_side_input_hint(self): # Test that a lambda that accepts only a VAR_POSITIONAL can accept # side-inputs. + # TODO(BEAM-8247): There's a bug with trivial_inference inferring the output + # type when side-inputs are used (their type hints are not passed). Remove + # with_output_types(...) when this bug is fixed. result = (['a', 'b', 'c'] - | beam.Map(lambda *args: args, 5).with_input_types(str, int)) + | beam.Map(lambda *args: args, 5).with_input_types(int, str) + .with_output_types(typehints.Tuple[str, int])) self.assertEqual([('a', 5), ('b', 5), ('c', 5)], sorted(result)) # Type hint order doesn't matter for VAR_POSITIONAL. result = (['a', 'b', 'c'] - | beam.Map(lambda *args: args, 5).with_input_types(int, str)) + | beam.Map(lambda *args: args, 5).with_input_types(int, str) + .with_output_types(typehints.Tuple[str, int])) self.assertEqual([('a', 5), ('b', 5), ('c', 5)], sorted(result)) if sys.version_info >= (3,):