diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index feee9dc0082b..fadc49461a3c 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -27,6 +27,7 @@ import mock import apache_beam as beam +from apache_beam import Pipeline from apache_beam.io import Read from apache_beam.io import Write from apache_beam.io.gcp.pubsub import MultipleReadFromPubSub @@ -364,6 +365,7 @@ def test_expand_with_wrong_source(self): @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') class TestWriteStringsToPubSubOverride(unittest.TestCase): + @mock.patch.object(Pipeline, '_assert_not_applying_PDone', mock.Mock()) def test_expand_deprecated(self): options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True @@ -385,6 +387,7 @@ def test_expand_deprecated(self): # Ensure that the properties passed through correctly self.assertEqual('a_topic', write_transform.dofn.short_topic_name) + @mock.patch.object(Pipeline, '_assert_not_applying_PDone', mock.Mock()) def test_expand(self): options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index c011732f9352..9db310a1db41 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -798,7 +798,12 @@ def apply( if type_options.pipeline_type_check: transform.type_check_inputs(pvalueish) + self._assert_not_applying_PDone(pvalueish, transform) + pvalueish_result = self.runner.apply(transform, pvalueish, self._options) + if pvalueish_result is None: + pvalueish_result = pvalue.PDone(self) + pvalueish_result.producer = current if type_options is not None and type_options.pipeline_type_check: transform.type_check_outputs(pvalueish_result) @@ -845,6 +850,20 @@ def apply( self.transforms_stack.pop() return pvalueish_result + def _assert_not_applying_PDone( + self, + pvalueish, # type: Optional[pvalue.PValue] + transform # type: ptransform.PTransform + ): + if isinstance(pvalueish, pvalue.PDone) and isinstance(transform, ParDo): + # If the input is a PDone, we cannot apply a ParDo transform. + full_label = self._current_transform().full_label + producer_label = pvalueish.producer.full_label + raise TypeCheckError( + f'Transform "{full_label}" was applied to the output of ' + f'"{producer_label}" but "{producer_label.split("/")[-1]}" ' + 'produces no PCollections.') + def _generate_unique_label( self, transform # type: str diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index b18bc0d668e2..fcc00fa928f5 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -61,6 +61,7 @@ from apache_beam.transforms.window import IntervalWindow from apache_beam.transforms.window import SlidingWindows from apache_beam.transforms.window import TimestampedValue +from apache_beam.typehints import TypeCheckError from apache_beam.utils import windowed_value from apache_beam.utils.timestamp import MIN_TIMESTAMP @@ -157,6 +158,26 @@ def test_create(self): pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10]) assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3') + def test_unexpected_PDone_errmsg(self): + """ + Test that a nice error message is raised if a transform that + returns None (i.e. produces no PCollection) is used as input + to a PTransform. + """ + class DoNothingTransform(PTransform): + def expand(self, pcoll): + return None + + class ParentTransform(PTransform): + def expand(self, pcoll): + return pcoll | DoNothingTransform() + + with pytest.raises( + TypeCheckError, + match=r".*applied to the output.*ParentTransform/DoNothingTransform"): + with TestPipeline() as pipeline: + _ = pipeline | ParentTransform() | beam.Map(lambda x: x + 1) + @mock.patch('logging.info') def test_runner_overrides_default_pickler(self, mock_info): with mock.patch.object(PipelineRunner,