From 3426c6e6b53d00ce49ec4b9be70cad12a1f90b03 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Wed, 4 Jun 2025 22:42:44 -0400 Subject: [PATCH 1/3] add friendly error message for when transform is applied to no output --- sdks/python/apache_beam/pipeline.py | 12 ++++++++++++ sdks/python/apache_beam/pipeline_test.py | 21 +++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index c011732f9352..6a7d84a3c2bc 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -798,7 +798,19 @@ def apply( if type_options.pipeline_type_check: transform.type_check_inputs(pvalueish) + if isinstance(pvalueish, pvalue.PDone) and isinstance(transform, ParDo): + # If the input is a PDone, we cannot apply a ParDo transform. + full_label = self._current_transform().full_label + producer_label = pvalueish.producer.full_label + raise TypeCheckError( + f'Transform "{full_label}" was applied to the output of ' + f'"{producer_label}" but "{producer_label.split("/")[-1]}" ' + 'produces no PCollections.') + pvalueish_result = self.runner.apply(transform, pvalueish, self._options) + if pvalueish_result is None: + pvalueish_result = pvalue.PDone(self) + pvalueish_result.producer = current if type_options is not None and type_options.pipeline_type_check: transform.type_check_outputs(pvalueish_result) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index b18bc0d668e2..658df5d04cee 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -61,6 +61,7 @@ from apache_beam.transforms.window import IntervalWindow from apache_beam.transforms.window import SlidingWindows from apache_beam.transforms.window import TimestampedValue +from apache_beam.typehints import TypeCheckError from apache_beam.utils import windowed_value from apache_beam.utils.timestamp import MIN_TIMESTAMP @@ -157,6 +158,26 @@ def test_create(self): pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10]) assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3') + def test_PEnd_errmsg(self): + """ + Test that a nice error message is raised if a transform that + returns None (i.e. produces no PCollection) is used as input + to a PTransform. + """ + class DoNothingTransform(PTransform): + def expand(self, pcoll): + return None + + class ParentTransform(PTransform): + def expand(self, pcoll): + return pcoll | DoNothingTransform() + + with pytest.raises( + TypeCheckError, + match=r".*applied to the output.*ParentTransform/DoNothingTransform"): + with TestPipeline() as pipeline: + _ = pipeline | ParentTransform() | beam.Map(lambda x: x + 1) + @mock.patch('logging.info') def test_runner_overrides_default_pickler(self, mock_info): with mock.patch.object(PipelineRunner, From 16d5e7332b0da81bc18af65d6797ea73720568f9 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 5 Jun 2025 08:12:09 -0400 Subject: [PATCH 2/3] update test name --- sdks/python/apache_beam/pipeline_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 658df5d04cee..fcc00fa928f5 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -158,7 +158,7 @@ def test_create(self): pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10]) assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3') - def test_PEnd_errmsg(self): + def test_unexpected_PDone_errmsg(self): """ Test that a nice error message is raised if a transform that returns None (i.e. produces no PCollection) is used as input From 807ef5eb8a55548ec2092933a4aaa2048fea0b75 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 5 Jun 2025 10:05:54 -0400 Subject: [PATCH 3/3] Fix pubsub unit tests that depend on old behavior --- sdks/python/apache_beam/io/gcp/pubsub_test.py | 3 +++ sdks/python/apache_beam/pipeline.py | 23 ++++++++++++------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index feee9dc0082b..fadc49461a3c 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -27,6 +27,7 @@ import mock import apache_beam as beam +from apache_beam import Pipeline from apache_beam.io import Read from apache_beam.io import Write from apache_beam.io.gcp.pubsub import MultipleReadFromPubSub @@ -364,6 +365,7 @@ def test_expand_with_wrong_source(self): @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') class TestWriteStringsToPubSubOverride(unittest.TestCase): + @mock.patch.object(Pipeline, '_assert_not_applying_PDone', mock.Mock()) def test_expand_deprecated(self): options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True @@ -385,6 +387,7 @@ def test_expand_deprecated(self): # Ensure that the properties passed through correctly self.assertEqual('a_topic', write_transform.dofn.short_topic_name) + @mock.patch.object(Pipeline, '_assert_not_applying_PDone', mock.Mock()) def test_expand(self): options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 6a7d84a3c2bc..9db310a1db41 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -798,14 +798,7 @@ def apply( if type_options.pipeline_type_check: transform.type_check_inputs(pvalueish) - if isinstance(pvalueish, pvalue.PDone) and isinstance(transform, ParDo): - # If the input is a PDone, we cannot apply a ParDo transform. - full_label = self._current_transform().full_label - producer_label = pvalueish.producer.full_label - raise TypeCheckError( - f'Transform "{full_label}" was applied to the output of ' - f'"{producer_label}" but "{producer_label.split("/")[-1]}" ' - 'produces no PCollections.') + self._assert_not_applying_PDone(pvalueish, transform) pvalueish_result = self.runner.apply(transform, pvalueish, self._options) if pvalueish_result is None: @@ -857,6 +850,20 @@ def apply( self.transforms_stack.pop() return pvalueish_result + def _assert_not_applying_PDone( + self, + pvalueish, # type: Optional[pvalue.PValue] + transform # type: ptransform.PTransform + ): + if isinstance(pvalueish, pvalue.PDone) and isinstance(transform, ParDo): + # If the input is a PDone, we cannot apply a ParDo transform. + full_label = self._current_transform().full_label + producer_label = pvalueish.producer.full_label + raise TypeCheckError( + f'Transform "{full_label}" was applied to the output of ' + f'"{producer_label}" but "{producer_label.split("/")[-1]}" ' + 'produces no PCollections.') + def _generate_unique_label( self, transform # type: str