From f0f7a4200ae5e69fa1951999334c9f1e72b5aacf Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 23 Apr 2025 17:39:28 -0700 Subject: [PATCH 1/2] Attempt prism for pipelines with unbounded PCollections. Also fix error with terminal state checking. --- sdks/python/apache_beam/runners/direct/direct_runner.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 8b8937653688..de330ab05ac6 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -110,6 +110,10 @@ def visit_transform(self, applied_ptransform): if timer.time_domain == TimeDomain.REAL_TIME: self.supported_by_fnapi_runner = False + def visit_value(self, value, producer_node): + if not value.is_bounded: + self.supported_by_fnapi_runner = False + class _PrismRunnerSupportVisitor(PipelineVisitor): """Visitor determining if a Pipeline can be run on the PrismRunner.""" def accept(self, pipeline): @@ -167,7 +171,8 @@ def visit_transform(self, applied_ptransform): pr = runner.run_pipeline(pipeline, options) # This is non-blocking, so if the state is *already* finished, something # probably failed on job submission. - if pr.state.is_terminal() and pr.state != PipelineState.DONE: + if (PipelineState.is_terminal(pr.state) and + pr.state != PipelineState.DONE): _LOGGER.info( 'Pipeline failed on PrismRunner, falling back toDirectRunner.') runner = BundleBasedDirectRunner() From 9cc1da4b2322f71dc9a373c84f79519ebb8f2a71 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 24 Apr 2025 17:24:07 -0700 Subject: [PATCH 2/2] Consider small periodic impulses as bounded. --- .../apache_beam/transforms/combiners_test.py | 1 + .../apache_beam/transforms/periodicsequence.py | 16 +++++++++++++--- .../transforms/periodicsequence_test.py | 3 ++- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index a8979239f831..cf423c1124bf 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -991,6 +991,7 @@ def test_combine_globally_for_unbounded_source_with_default(self): stop_timestamp=time.time() + 4, fire_interval=1, apply_windowing=False, + is_bounded=False, ) | beam.Map(lambda x: ('c', 1)) | beam.WindowInto( diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index d06651a9504b..ed6529068e51 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -178,25 +178,35 @@ def __init__( start_timestamp=Timestamp.now(), stop_timestamp=MAX_TIMESTAMP, fire_interval=360.0, - apply_windowing=False): + apply_windowing=False, + is_bounded=None): ''' :param start_timestamp: Timestamp for first element. :param stop_timestamp: Timestamp after which no elements will be output. :param fire_interval: Interval in seconds at which to output elements. :param apply_windowing: Whether each element should be assigned to individual window. If false, all elements will reside in global window. + :param is_bounded: whether to treat the output PCollection as bounded. + Defaults to True for small timestamp ranges and False for large ones. ''' self.start_ts = start_timestamp self.stop_ts = stop_timestamp self.interval = fire_interval self.apply_windowing = apply_windowing + self.is_bounded = ( + stop_timestamp - start_timestamp < 60 + if is_bounded is None else is_bounded) def expand(self, pbegin): - result = ( + sequence = ( pbegin | 'ImpulseElement' >> beam.Create( [(self.start_ts, self.stop_ts, self.interval)]) - | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn()) + | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn())) + if self.is_bounded: + sequence.is_bounded = True + result = ( + sequence | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt))) if self.apply_windowing: result = result | 'ApplyWindowing' >> beam.WindowInto( diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index 221520c94622..a7433a604b0d 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -91,7 +91,8 @@ def test_periodicimpulse_default_start(self): assert is_same_type, error with TestPipeline() as p: - result = p | 'PeriodicImpulse' >> PeriodicImpulse(it, et, interval) + result = p | 'PeriodicImpulse' >> PeriodicImpulse( + it, et, interval, is_bounded=False) k = [it + x * interval for x in range(0, int(duration / interval))] self.assertEqual(result.is_bounded, False)