Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import mock

import apache_beam as beam
from apache_beam import Pipeline
from apache_beam.io import Read
from apache_beam.io import Write
from apache_beam.io.gcp.pubsub import MultipleReadFromPubSub
Expand Down Expand Up @@ -364,6 +365,7 @@ def test_expand_with_wrong_source(self):

@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestWriteStringsToPubSubOverride(unittest.TestCase):
@mock.patch.object(Pipeline, '_assert_not_applying_PDone', mock.Mock())
def test_expand_deprecated(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
Expand All @@ -385,6 +387,7 @@ def test_expand_deprecated(self):
# Ensure that the properties passed through correctly
self.assertEqual('a_topic', write_transform.dofn.short_topic_name)

@mock.patch.object(Pipeline, '_assert_not_applying_PDone', mock.Mock())
def test_expand(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
Expand Down
19 changes: 19 additions & 0 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,12 @@ def apply(
if type_options.pipeline_type_check:
transform.type_check_inputs(pvalueish)

self._assert_not_applying_PDone(pvalueish, transform)

pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
if pvalueish_result is None:
pvalueish_result = pvalue.PDone(self)
pvalueish_result.producer = current

if type_options is not None and type_options.pipeline_type_check:
transform.type_check_outputs(pvalueish_result)
Expand Down Expand Up @@ -845,6 +850,20 @@ def apply(
self.transforms_stack.pop()
return pvalueish_result

def _assert_not_applying_PDone(
self,
pvalueish, # type: Optional[pvalue.PValue]
transform # type: ptransform.PTransform
):
if isinstance(pvalueish, pvalue.PDone) and isinstance(transform, ParDo):
# If the input is a PDone, we cannot apply a ParDo transform.
full_label = self._current_transform().full_label
producer_label = pvalueish.producer.full_label
raise TypeCheckError(
f'Transform "{full_label}" was applied to the output of '
f'"{producer_label}" but "{producer_label.split("/")[-1]}" '
'produces no PCollections.')

def _generate_unique_label(
self,
transform # type: str
Expand Down
21 changes: 21 additions & 0 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
from apache_beam.typehints import TypeCheckError
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import MIN_TIMESTAMP

Expand Down Expand Up @@ -157,6 +158,26 @@ def test_create(self):
pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10])
assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3')

def test_unexpected_PDone_errmsg(self):
"""
Test that a nice error message is raised if a transform that
returns None (i.e. produces no PCollection) is used as input
to a PTransform.
"""
class DoNothingTransform(PTransform):
def expand(self, pcoll):
return None

class ParentTransform(PTransform):
def expand(self, pcoll):
return pcoll | DoNothingTransform()

with pytest.raises(
TypeCheckError,
match=r".*applied to the output.*ParentTransform/DoNothingTransform"):
with TestPipeline() as pipeline:
_ = pipeline | ParentTransform() | beam.Map(lambda x: x + 1)

@mock.patch('logging.info')
def test_runner_overrides_default_pickler(self, mock_info):
with mock.patch.object(PipelineRunner,
Expand Down
Loading