diff --git a/CHANGES.md b/CHANGES.md index 6dd8504db794..916fc4f26f39 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -76,6 +76,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)). * [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support ## Breaking Changes diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 8b8937653688..1df2e88f6140 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -167,7 +167,8 @@ def visit_transform(self, applied_ptransform): pr = runner.run_pipeline(pipeline, options) # This is non-blocking, so if the state is *already* finished, something # probably failed on job submission. - if pr.state.is_terminal() and pr.state != PipelineState.DONE: + if (PipelineState.is_terminal(pr.state) and + pr.state != PipelineState.DONE): _LOGGER.info( 'Pipeline failed on PrismRunner, falling back toDirectRunner.') runner = BundleBasedDirectRunner()