From 2e1a6c29d5b6105b460f26bdf5839c484648b4c3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 23 Jun 2025 22:05:10 -0400 Subject: [PATCH 1/8] Move PeriodicStream into periodicsequence. --- sdks/python/apache_beam/ml/ts/util.py | 193 ------------------ sdks/python/apache_beam/ml/ts/util_test.py | 93 --------- .../fn_api_runner/fn_runner_test.py | 2 +- .../transforms/periodicsequence.py | 148 ++++++++++++-- .../transforms/periodicsequence_test.py | 65 ++++++ 5 files changed, 199 insertions(+), 302 deletions(-) delete mode 100644 sdks/python/apache_beam/ml/ts/util.py delete mode 100644 sdks/python/apache_beam/ml/ts/util_test.py diff --git a/sdks/python/apache_beam/ml/ts/util.py b/sdks/python/apache_beam/ml/ts/util.py deleted file mode 100644 index a50f2667621b..000000000000 --- a/sdks/python/apache_beam/ml/ts/util.py +++ /dev/null @@ -1,193 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import math -import time -from typing import Any -from typing import Optional -from typing import Sequence - -import apache_beam as beam -from apache_beam.io.watermark_estimators import ManualWatermarkEstimator -from apache_beam.runners import sdf_utils -from apache_beam.transforms.periodicsequence import ImpulseSeqGenRestrictionProvider # pylint:disable=line-too-long -from apache_beam.transforms.window import TimestampedValue -from apache_beam.utils import timestamp - - -class ImpulseStreamGenDoFn(beam.DoFn): - """ - Generates a periodic, unbounded stream of elements from a provided sequence. - - (Similar to ImpulseSeqGenDoFn in apache_beam.transforms.periodicsequence) - - This Splittable DoFn (SDF) is designed to simulate a continuous stream of - data for testing or demonstration purposes. It takes a Python sequence (e.g., - a list) and emits its elements one by one in a loop, assigning a timestamp - to each. - - The DoFn operates in two modes based on the structure of the input `data`: - - - **Non-Timestamped Data**: If `data` is a sequence of arbitrary values - (e.g., `[v1, v2, ...]`), the DoFn will assign a new timestamp to each - emitted element. The timestamps are calculated by starting at a given - `start_time` and incrementing by a fixed `interval`. - - **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each - tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn - will use the provided timestamp for the emitted element. - - The rate of emission is controlled by wall-clock time. The DoFn will only - emit elements whose timestamp (either calculated or provided) is in the past - compared to the current system time. When it "catches up" to the present, - it will pause and defer the remainder of the work. - - Args: - data: The sequence of elements to emit into the PCollection. The elements - can be raw values or pre-timestamped tuples in the format - `(apache_beam.utils.timestamp.Timestamp, value)`. - """ - def __init__(self, data: Sequence[Any]): - self._data = data - self._len = len(data) - self._is_timestamped_value = len(data) > 0 and isinstance( - data[0], tuple) and isinstance(data[0][0], timestamp.Timestamp) - - def _get_timestamped_value(self, index, current_output_timestamp): - if self._is_timestamped_value: - event_time, value = self._data[index % self._len] - return TimestampedValue(value, event_time) - else: - value = self._data[index % self._len] - return TimestampedValue(value, current_output_timestamp) - - @beam.DoFn.unbounded_per_element() - def process( - self, - element, - restriction_tracker=beam.DoFn.RestrictionParam( - ImpulseSeqGenRestrictionProvider()), - watermark_estimator=beam.DoFn.WatermarkEstimatorParam( - ManualWatermarkEstimator.default_provider())): - start, _, interval = element - - if isinstance(start, timestamp.Timestamp): - start = start.micros / 1000000 - - assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView) - - current_output_index = restriction_tracker.current_restriction().start - - while True: - current_output_timestamp = start + interval * current_output_index - - if current_output_timestamp > time.time(): - # we are too ahead of time, let's wait. - restriction_tracker.defer_remainder( - timestamp.Timestamp(current_output_timestamp)) - return - - if not restriction_tracker.try_claim(current_output_index): - # nothing to claim, just stop - return - - output = self._get_timestamped_value( - current_output_index, current_output_timestamp) - - current_watermark = watermark_estimator.current_watermark() - if current_watermark is None or output.timestamp > current_watermark: - # ensure watermark is monotonic - watermark_estimator.set_watermark(output.timestamp) - - yield output - - current_output_index += 1 - - -class PeriodicStream(beam.PTransform): - """A PTransform that generates a periodic stream of elements from a sequence. - - This transform creates a `PCollection` by emitting elements from a provided - Python sequence at a specified time interval. It is designed for use in - streaming pipelines to simulate a live, continuous source of data. - - The transform can be configured to: - - Emit the sequence only once. - - Repeat the sequence indefinitely or for a maximum duration. - - Control the time interval between elements. - - To ensure that the stream does not emit a burst of elements immediately at - pipeline startup, a fixed warmup period is added before the first element - is generated. - - Args: - data: The sequence of elements to emit into the PCollection. The elements - can be raw values or pre-timestamped tuples in the format - `(apache_beam.utils.timestamp.Timestamp, value)`. - max_duration: The maximum total duration in seconds for the stream - generation. If `None` (the default) and `repeat` is `True`, the - stream is effectively infinite. If `repeat` is `False`, the stream's - duration is the shorter of this value and the time required to emit - the sequence once. - interval: The delay in seconds between consecutive elements. - Defaults to 0.1. - repeat: If `True`, the input `data` sequence is emitted repeatedly. - If `False` (the default), the sequence is emitted only once. - """ - - WARMUP_TIME = 2 - - def __init__( - self, - data: Sequence[Any], - max_duration: Optional[float] = None, - interval: float = 0.1, - repeat: bool = False): - self._data = data - self._interval = interval - self._repeat = repeat - - # In `ImpulseSeqGenRestrictionProvider`, the total number of counts - # (i.e. total_outputs) is computed by ceil((end - start) / interval), - # where end is start + duration. - # Due to precision error of arithmetic operations, even if duration is set - # to len(self._data) * interval, (end - start) / interval could be a little - # bit smaller or bigger than len(self._data). - # In case of being bigger, total_outputs would be len(self._data) + 1, - # as the ceil() operation is used. - # Assuming that the precision error is no bigger than 1%, by subtracting - # a small amount, we ensure that the result after ceil is stable even if - # the precision error is present. - self._duration = len(self._data) * interval - 0.01 * interval - self._max_duration = max_duration if max_duration is not None else float( - "inf") - - def expand(self, pbegin): - # Give the runner some time to start up so the events will not cluster - # at the beginning. - start = timestamp.Timestamp.now() + PeriodicStream.WARMUP_TIME - - if not self._repeat: - stop = start + min(self._duration, self._max_duration) - else: - stop = timestamp.MAX_TIMESTAMP if math.isinf( - self._max_duration) else start + self._max_duration - - result = ( - pbegin - | 'ImpulseElement' >> beam.Create([(start, stop, self._interval)]) - | 'GenStream' >> beam.ParDo(ImpulseStreamGenDoFn(self._data))) - return result diff --git a/sdks/python/apache_beam/ml/ts/util_test.py b/sdks/python/apache_beam/ml/ts/util_test.py deleted file mode 100644 index 5a2a8a79ce89..000000000000 --- a/sdks/python/apache_beam/ml/ts/util_test.py +++ /dev/null @@ -1,93 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import logging -import unittest - -import apache_beam as beam -from apache_beam.ml.ts.util import PeriodicStream -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to -from apache_beam.transforms.window import FixedWindows -from apache_beam.utils.timestamp import Timestamp - - -class PeriodicStreamTest(unittest.TestCase): - def test_interval(self): - options = PipelineOptions() - start = Timestamp.now() - with beam.Pipeline(options=options) as p: - ret = ( - p | PeriodicStream([1, 2, 3, 4], interval=0.5) - | beam.WindowInto(FixedWindows(0.5)) - | beam.WithKeys(0) - | beam.GroupByKey()) - expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4])] - assert_that(ret, equal_to(expected)) - end = Timestamp.now() - self.assertGreaterEqual(end - start, 3) - self.assertLessEqual(end - start, 7) - - def test_repeat(self): - options = PipelineOptions() - start = Timestamp.now() - with beam.Pipeline(options=options) as p: - ret = ( - p | PeriodicStream( - [1, 2, 3, 4], interval=0.5, max_duration=3, repeat=True) - | beam.WindowInto(FixedWindows(0.5)) - | beam.WithKeys(0) - | beam.GroupByKey()) - expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4]), (0, [1]), (0, [2])] - assert_that(ret, equal_to(expected)) - end = Timestamp.now() - self.assertGreaterEqual(end - start, 3) - self.assertLessEqual(end - start, 7) - - def test_timestamped_value(self): - options = PipelineOptions() - start = Timestamp.now() - with beam.Pipeline(options=options) as p: - ret = ( - p | PeriodicStream([(Timestamp(1), 1), (Timestamp(3), 2), - (Timestamp(2), 3), (Timestamp(1), 4)], - interval=0.5) - | beam.WindowInto(FixedWindows(0.5)) - | beam.WithKeys(0) - | beam.GroupByKey()) - expected = [(0, [1, 4]), (0, [2]), (0, [3])] - assert_that(ret, equal_to(expected)) - end = Timestamp.now() - self.assertGreaterEqual(end - start, 3) - self.assertLessEqual(end - start, 7) - - def test_stable_output(self): - options = PipelineOptions() - data = [(Timestamp(1), 1), (Timestamp(2), 2), (Timestamp(3), 3), - (Timestamp(6), 6), (Timestamp(4), 4), (Timestamp(5), 5), - (Timestamp(7), 7), (Timestamp(8), 8), (Timestamp(9), 9), - (Timestamp(10), 10)] - expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - with beam.Pipeline(options=options) as p: - ret = (p | PeriodicStream(data, interval=0.0001)) - assert_that(ret, equal_to(expected)) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.WARNING) - unittest.main() diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 539700948919..f4d1b036a639 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -53,7 +53,6 @@ from apache_beam.metrics import monitoring_infos from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.metricbase import MetricName -from apache_beam.ml.ts.util import PeriodicStream from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import DirectOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -75,6 +74,7 @@ from apache_beam.transforms import userstate from apache_beam.transforms import window from apache_beam.transforms.periodicsequence import PeriodicImpulse +from apache_beam.transforms.periodicsequence import PeriodicStream from apache_beam.utils import timestamp from apache_beam.utils import windowed_value diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index d06651a9504b..046d8975f152 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -17,6 +17,9 @@ import math import time +from typing import Any +from typing import Optional +from typing import Sequence import apache_beam as beam from apache_beam.io.restriction_trackers import OffsetRange @@ -91,7 +94,39 @@ class ImpulseSeqGenDoFn(beam.DoFn): ImpulseSeqGenDoFn can't guarantee that each element is output at exact time. ImpulseSeqGenDoFn guarantees that elements would not be output prior to given runtime timestamp. + + The output mode of the DoFn is based on the input `data`: + + - **None**: If `data` is None (by default), the output element will be the + timestamp. + - **Non-Timestamped Data**: If `data` is a sequence of arbitrary values + (e.g., `[v1, v2, ...]`), the DoFn will assign a timestamp to each + emitted element. The timestamps are calculated by starting at a given + `start_time` and incrementing by a fixed `interval`. + - **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each + tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn + will use the provided timestamp for the emitted element. ''' + def __init__(self, data: Optional[Sequence[Any]] = None): + self._data = data + assert self._data is None or len(self._data) > 0 + self._len = len(self._data) if self._data is not None else 0 + self._is_timestamped_value = self._data is not None and self._len > 0 and \ + isinstance(self._data[0], tuple) and \ + isinstance(self._data[0][0], timestamp.Timestamp) + + def _get_output(self, index, current_output_timestamp): + if self._data is None: + return current_output_timestamp, Timestamp.of(current_output_timestamp) + + if self._is_timestamped_value: + event_time, value = self._data[index % self._len] + return TimestampedValue(value, event_time), event_time + else: + value = self._data[index % self._len] + return TimestampedValue(value, current_output_timestamp), \ + Timestamp.of(current_output_timestamp) + @beam.DoFn.unbounded_per_element() def process( self, @@ -114,24 +149,31 @@ def process( assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView) current_output_index = restriction_tracker.current_restriction().start - current_output_timestamp = start + interval * current_output_index - current_time = time.time() - watermark_estimator.set_watermark( - timestamp.Timestamp(current_output_timestamp)) - - while current_output_timestamp <= current_time: - if restriction_tracker.try_claim(current_output_index): - yield current_output_timestamp - current_output_index += 1 - current_output_timestamp = start + interval * current_output_index - current_time = time.time() - watermark_estimator.set_watermark( + + while True: + current_output_timestamp = start + interval * current_output_index + + if current_output_timestamp > time.time(): + # we are too ahead of time, let's wait. + restriction_tracker.defer_remainder( timestamp.Timestamp(current_output_timestamp)) - else: return - restriction_tracker.defer_remainder( - timestamp.Timestamp(current_output_timestamp)) + if not restriction_tracker.try_claim(current_output_index): + # nothing to claim, just stop + return + + output, output_ts = self._get_output(current_output_index, + current_output_timestamp) + + current_watermark = watermark_estimator.current_watermark() + if current_watermark is None or output_ts > current_watermark: + # ensure watermark is monotonic + watermark_estimator.set_watermark(output_ts) + + yield output + + current_output_index += 1 class PeriodicSequence(PTransform): @@ -202,3 +244,79 @@ def expand(self, pbegin): result = result | 'ApplyWindowing' >> beam.WindowInto( window.FixedWindows(self.interval)) return result + + +class PeriodicStream(beam.PTransform): + """A PTransform that generates a periodic stream of elements from a sequence. + + This transform creates a `PCollection` by emitting elements from a provided + Python sequence at a specified time interval. It is designed for use in + streaming pipelines to simulate a live, continuous source of data. + + The transform can be configured to: + - Emit the sequence only once. + - Repeat the sequence indefinitely or for a maximum duration. + - Control the time interval between elements. + + To ensure that the stream does not emit a burst of elements immediately at + pipeline startup, a fixed warmup period is added before the first element + is generated. + + Args: + data: The sequence of elements to emit into the PCollection. The elements + can be raw values or pre-timestamped tuples in the format + `(apache_beam.utils.timestamp.Timestamp, value)`. + max_duration: The maximum total duration in seconds for the stream + generation. If `None` (the default) and `repeat` is `True`, the + stream is effectively infinite. If `repeat` is `False`, the stream's + duration is the shorter of this value and the time required to emit + the sequence once. + interval: The delay in seconds between consecutive elements. + Defaults to 0.1. + repeat: If `True`, the input `data` sequence is emitted repeatedly. + If `False` (the default), the sequence is emitted only once. + warmup_time: The extra wait time for the impulse element + (start, end, interval) to reach `ImpulseSeqGenDoFn`. It is used to + avoid the events clustering at the beginning. + """ + def __init__( + self, + data: Sequence[Any], + max_duration: Optional[float] = None, + interval: float = 0.1, + repeat: bool = False, + warmup_time: float = 2.0): + self._data = data + self._interval = interval + self._repeat = repeat + self._warmup_time = warmup_time + + # In `ImpulseSeqGenRestrictionProvider`, the total number of counts + # (i.e. total_outputs) is computed by ceil((end - start) / interval), + # where end is start + duration. + # Due to precision error of arithmetic operations, even if duration is set + # to len(self._data) * interval, (end - start) / interval could be a little + # bit smaller or bigger than len(self._data). + # In case of being bigger, total_outputs would be len(self._data) + 1, + # as the ceil() operation is used. + # Assuming that the precision error is no bigger than 1%, by subtracting + # a small amount, we ensure that the result after ceil is stable even if + # the precision error is present. + self._duration = len(self._data) * interval - 0.01 * interval + self._max_duration = max_duration if max_duration is not None else float( + "inf") + + def expand(self, pbegin): + start = timestamp.Timestamp.now() + self._warmup_time + + if not self._repeat: + stop = start + min(self._duration, self._max_duration) + else: + stop = timestamp.MAX_TIMESTAMP if math.isinf( + self._max_duration) else start + self._max_duration + + result = ( + pbegin + | 'ImpulseElement' >> beam.Create([(start, stop, self._interval)]) + | 'GenStream' >> beam.ParDo(ImpulseSeqGenDoFn(self._data))) + return result diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index 221520c94622..53629ac102a1 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -28,9 +28,14 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms import trigger +from apache_beam.transforms import window from apache_beam.transforms.periodicsequence import PeriodicImpulse from apache_beam.transforms.periodicsequence import PeriodicSequence +from apache_beam.transforms.periodicsequence import PeriodicStream from apache_beam.transforms.periodicsequence import _sequence_backlog_bytes +from apache_beam.transforms.window import FixedWindows +from apache_beam.utils.timestamp import Timestamp # Disable frequent lint warning due to pipe operator for chaining transforms. # pylint: disable=expression-not-assigned @@ -133,5 +138,65 @@ def test_periodicsequence_output_size(self): _sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8) +class PeriodicStreamTest(unittest.TestCase): + def test_processing_time(self): + with TestPipeline() as p: + ret = ( + p | PeriodicStream([10, 20, 30], interval=2) + | beam.Map(lambda _: time.time()) + | beam.WindowInto( + window.GlobalWindows(), + trigger=trigger.Repeatedly(trigger.AfterCount(3)), + accumulation_mode=trigger.AccumulationMode.DISCARDING, + ) + | beam.GroupBy() + | beam.FlatMap(lambda x: [v - min(x[1]) for v in x[1]])) + expected = [0, 2, 4] + assert_that(ret, equal_to(expected, lambda x, y: abs(x - y) < 0.5)) + + def test_interval(self): + with TestPipeline() as p: + ret = ( + p | PeriodicStream([1, 2, 3, 4], interval=0.5) + | beam.WindowInto(FixedWindows(0.5)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4])] + assert_that(ret, equal_to(expected)) + + def test_repeat(self): + with TestPipeline() as p: + ret = ( + p | PeriodicStream( + [1, 2, 3, 4], interval=0.5, max_duration=3, repeat=True) + | beam.WindowInto(FixedWindows(0.5)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4]), (0, [1]), (0, [2])] + assert_that(ret, equal_to(expected)) + + def test_timestamped_value(self): + with TestPipeline() as p: + ret = ( + p | PeriodicStream([(Timestamp(1), 1), (Timestamp(3), 2), + (Timestamp(2), 3), (Timestamp(1), 4)], + interval=0.5) + | beam.WindowInto(FixedWindows(0.5)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1, 4]), (0, [2]), (0, [3])] + assert_that(ret, equal_to(expected)) + + def test_stable_output(self): + data = [(Timestamp(1), 1), (Timestamp(2), 2), (Timestamp(3), 3), + (Timestamp(6), 6), (Timestamp(4), 4), (Timestamp(5), 5), + (Timestamp(7), 7), (Timestamp(8), 8), (Timestamp(9), 9), + (Timestamp(10), 10)] + expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + with TestPipeline() as p: + ret = (p | PeriodicStream(data, interval=0.0001)) + assert_that(ret, equal_to(expected)) + + if __name__ == '__main__': unittest.main() From 30757b495fcb99c1c679a9db2fd2a773da1d70a9 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 26 Jun 2025 15:51:13 -0400 Subject: [PATCH 2/8] Consolidate PeriodicStream into PeriodicImpulse. --- .../transforms/periodicsequence.py | 171 ++++++++---------- .../transforms/periodicsequence_test.py | 61 +++++-- 2 files changed, 120 insertions(+), 112 deletions(-) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index 046d8975f152..550629c6c2d4 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -17,9 +17,11 @@ import math import time +import warnings from typing import Any from typing import Optional from typing import Sequence +from typing import Union import apache_beam as beam from apache_beam.io.restriction_trackers import OffsetRange @@ -111,21 +113,21 @@ def __init__(self, data: Optional[Sequence[Any]] = None): self._data = data assert self._data is None or len(self._data) > 0 self._len = len(self._data) if self._data is not None else 0 - self._is_timestamped_value = self._data is not None and self._len > 0 and \ + self._is_pre_timestamped = self._data is not None and self._len > 0 and \ isinstance(self._data[0], tuple) and \ isinstance(self._data[0][0], timestamp.Timestamp) def _get_output(self, index, current_output_timestamp): if self._data is None: - return current_output_timestamp, Timestamp.of(current_output_timestamp) + return TimestampedValue( + current_output_timestamp, current_output_timestamp) - if self._is_timestamped_value: + if self._is_pre_timestamped: event_time, value = self._data[index % self._len] - return TimestampedValue(value, event_time), event_time + return TimestampedValue(value, event_time) else: value = self._data[index % self._len] - return TimestampedValue(value, current_output_timestamp), \ - Timestamp.of(current_output_timestamp) + return TimestampedValue(value, current_output_timestamp) @beam.DoFn.unbounded_per_element() def process( @@ -163,13 +165,12 @@ def process( # nothing to claim, just stop return - output, output_ts = self._get_output(current_output_index, - current_output_timestamp) + output = self._get_output(current_output_index, current_output_timestamp) current_watermark = watermark_estimator.current_watermark() - if current_watermark is None or output_ts > current_watermark: + if current_watermark is None or output.timestamp > current_watermark: # ensure watermark is monotonic - watermark_estimator.set_watermark(output_ts) + watermark_estimator.set_watermark(output.timestamp) yield output @@ -215,108 +216,94 @@ class PeriodicImpulse(PTransform): but can be used as first transform in pipeline. The PCollection generated by PeriodicImpulse is unbounded. ''' + + def _validate_and_adjust_duration(self): + assert self.data + + # The total time we need to impulse all the data. + data_duration = (len(self.data) - 1) * self.interval + + is_pre_timestamped = isinstance(self.data[0], tuple) and \ + isinstance(self.data[0][0], timestamp.Timestamp) + + if isinstance(self.start_ts, Timestamp): + start = self.start_ts.micros / 1000000 + else: + start = self.start_ts + + if isinstance(self.stop_ts, Timestamp): + if self.stop_ts == MAX_TIMESTAMP: + # adjust stop timestamp to match the data duration + end = start + data_duration + if self.interval > 1e-6: + end += 1e-6 + self.stop_ts = Timestamp.of(end) + else: + end = self.stop_ts.micros / 1000000 + else: + end = self.stop_ts + + # The total time for the impulse signal which occurs in [start, end). + impulse_duration = end - start + if data_duration + self.interval < impulse_duration: + # We don't have enough data for the impulse. + # If we can fit at least one more data point in the impulse duration, + # then we will be in the repeat mode. + message = 'The number of elements in the provided pre-timestamped ' \ + 'data sequence is not enough to span the full impulse duration. ' \ + f'Expected duration: {impulse_duration:.6f}, ' \ + f'actual data duration: {data_duration:.6f}.' + + if is_pre_timestamped: + raise ValueError( + f'{message} Please either provide more data or decrease ' + '`stop_timestamp`.') + else: + warnings.warn( + f'{message} As a result, the data sequence will be repeated to ' + 'generate elements for the entire duration.') + def __init__( self, - start_timestamp=Timestamp.now(), - stop_timestamp=MAX_TIMESTAMP, - fire_interval=360.0, - apply_windowing=False): + start_timestamp: Union[Timestamp, float] = Timestamp.now(), + stop_timestamp: Union[Timestamp, float] = MAX_TIMESTAMP, + fire_interval: float = 360.0, + apply_windowing: bool = False, + data: Optional[Sequence[Any]] = None): ''' :param start_timestamp: Timestamp for first element. :param stop_timestamp: Timestamp after which no elements will be output. :param fire_interval: Interval in seconds at which to output elements. :param apply_windowing: Whether each element should be assigned to individual window. If false, all elements will reside in global window. + :param data: The sequence of elements to emit into the PCollection. + The elements can be raw values or pre-timestamped tuples in the format + `(apache_beam.utils.timestamp.Timestamp, value)`. ''' self.start_ts = start_timestamp self.stop_ts = stop_timestamp self.interval = fire_interval self.apply_windowing = apply_windowing + self.data = data + + if self.data: + self._validate_and_adjust_duration() def expand(self, pbegin): result = ( pbegin | 'ImpulseElement' >> beam.Create( [(self.start_ts, self.stop_ts, self.interval)]) - | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn()) - | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt))) + | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn(self.data))) + + if not self.data: + # This step is only to ensure the current PTransform expansion is + # compatible with the previous Beam versions. + result = ( + result + | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt))) + if self.apply_windowing: result = result | 'ApplyWindowing' >> beam.WindowInto( window.FixedWindows(self.interval)) return result - - -class PeriodicStream(beam.PTransform): - """A PTransform that generates a periodic stream of elements from a sequence. - - This transform creates a `PCollection` by emitting elements from a provided - Python sequence at a specified time interval. It is designed for use in - streaming pipelines to simulate a live, continuous source of data. - - The transform can be configured to: - - Emit the sequence only once. - - Repeat the sequence indefinitely or for a maximum duration. - - Control the time interval between elements. - - To ensure that the stream does not emit a burst of elements immediately at - pipeline startup, a fixed warmup period is added before the first element - is generated. - - Args: - data: The sequence of elements to emit into the PCollection. The elements - can be raw values or pre-timestamped tuples in the format - `(apache_beam.utils.timestamp.Timestamp, value)`. - max_duration: The maximum total duration in seconds for the stream - generation. If `None` (the default) and `repeat` is `True`, the - stream is effectively infinite. If `repeat` is `False`, the stream's - duration is the shorter of this value and the time required to emit - the sequence once. - interval: The delay in seconds between consecutive elements. - Defaults to 0.1. - repeat: If `True`, the input `data` sequence is emitted repeatedly. - If `False` (the default), the sequence is emitted only once. - warmup_time: The extra wait time for the impulse element - (start, end, interval) to reach `ImpulseSeqGenDoFn`. It is used to - avoid the events clustering at the beginning. - """ - def __init__( - self, - data: Sequence[Any], - max_duration: Optional[float] = None, - interval: float = 0.1, - repeat: bool = False, - warmup_time: float = 2.0): - self._data = data - self._interval = interval - self._repeat = repeat - self._warmup_time = warmup_time - - # In `ImpulseSeqGenRestrictionProvider`, the total number of counts - # (i.e. total_outputs) is computed by ceil((end - start) / interval), - # where end is start + duration. - # Due to precision error of arithmetic operations, even if duration is set - # to len(self._data) * interval, (end - start) / interval could be a little - # bit smaller or bigger than len(self._data). - # In case of being bigger, total_outputs would be len(self._data) + 1, - # as the ceil() operation is used. - # Assuming that the precision error is no bigger than 1%, by subtracting - # a small amount, we ensure that the result after ceil is stable even if - # the precision error is present. - self._duration = len(self._data) * interval - 0.01 * interval - self._max_duration = max_duration if max_duration is not None else float( - "inf") - - def expand(self, pbegin): - start = timestamp.Timestamp.now() + self._warmup_time - - if not self._repeat: - stop = start + min(self._duration, self._max_duration) - else: - stop = timestamp.MAX_TIMESTAMP if math.isinf( - self._max_duration) else start + self._max_duration - - result = ( - pbegin - | 'ImpulseElement' >> beam.Create([(start, stop, self._interval)]) - | 'GenStream' >> beam.ParDo(ImpulseSeqGenDoFn(self._data))) - return result diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index 53629ac102a1..ffeb80c31f6f 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -32,7 +32,6 @@ from apache_beam.transforms import window from apache_beam.transforms.periodicsequence import PeriodicImpulse from apache_beam.transforms.periodicsequence import PeriodicSequence -from apache_beam.transforms.periodicsequence import PeriodicStream from apache_beam.transforms.periodicsequence import _sequence_backlog_bytes from apache_beam.transforms.window import FixedWindows from apache_beam.utils.timestamp import Timestamp @@ -137,12 +136,16 @@ def test_periodicsequence_output_size(self): self.assertEqual( _sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8) - -class PeriodicStreamTest(unittest.TestCase): + @unittest.skip("hard to determine warm-up time and threshold for runners.") def test_processing_time(self): + warmup_time = 3 + threshold = 0.5 with TestPipeline() as p: ret = ( - p | PeriodicStream([10, 20, 30], interval=2) + p | PeriodicImpulse( + start_timestamp=Timestamp.now() + warmup_time, + data=[10, 20, 30], + fire_interval=2) | beam.Map(lambda _: time.time()) | beam.WindowInto( window.GlobalWindows(), @@ -152,12 +155,12 @@ def test_processing_time(self): | beam.GroupBy() | beam.FlatMap(lambda x: [v - min(x[1]) for v in x[1]])) expected = [0, 2, 4] - assert_that(ret, equal_to(expected, lambda x, y: abs(x - y) < 0.5)) + assert_that(ret, equal_to(expected, lambda x, y: abs(x - y) < threshold)) def test_interval(self): with TestPipeline() as p: ret = ( - p | PeriodicStream([1, 2, 3, 4], interval=0.5) + p | PeriodicImpulse(data=[1, 2, 3, 4], fire_interval=0.5) | beam.WindowInto(FixedWindows(0.5)) | beam.WithKeys(0) | beam.GroupByKey()) @@ -165,36 +168,54 @@ def test_interval(self): assert_that(ret, equal_to(expected)) def test_repeat(self): - with TestPipeline() as p: - ret = ( - p | PeriodicStream( - [1, 2, 3, 4], interval=0.5, max_duration=3, repeat=True) - | beam.WindowInto(FixedWindows(0.5)) - | beam.WithKeys(0) - | beam.GroupByKey()) - expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4]), (0, [1]), (0, [2])] - assert_that(ret, equal_to(expected)) + now = Timestamp.now() + with self.assertWarnsRegex(UserWarning, "not enough to span"): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=now, + stop_timestamp=now + 2.6, + data=[1, 2, 3, 4], + fire_interval=0.5) + | beam.WindowInto(FixedWindows(0.5)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4]), (0, [1]), (0, [2])] + assert_that(ret, equal_to(expected)) def test_timestamped_value(self): with TestPipeline() as p: ret = ( - p | PeriodicStream([(Timestamp(1), 1), (Timestamp(3), 2), - (Timestamp(2), 3), (Timestamp(1), 4)], - interval=0.5) + p | PeriodicImpulse( + data=[(Timestamp(1), 1), (Timestamp(3), 2), (Timestamp(2), 3), + (Timestamp(1), 4)], + fire_interval=0.5) | beam.WindowInto(FixedWindows(0.5)) | beam.WithKeys(0) | beam.GroupByKey()) expected = [(0, [1, 4]), (0, [2]), (0, [3])] assert_that(ret, equal_to(expected)) - def test_stable_output(self): + def test_not_enough_timestamped_value(self): + now = Timestamp.now() + data = [(Timestamp(1), 1), (Timestamp(2), 2), (Timestamp(3), 3)] + with self.assertRaisesRegex(ValueError, "not enough to span"): + with TestPipeline() as p: + _ = ( + p | PeriodicImpulse( + start_timestamp=now, + stop_timestamp=now + 2.6, + data=data, + fire_interval=0.5)) + + def test_small_interval(self): data = [(Timestamp(1), 1), (Timestamp(2), 2), (Timestamp(3), 3), (Timestamp(6), 6), (Timestamp(4), 4), (Timestamp(5), 5), (Timestamp(7), 7), (Timestamp(8), 8), (Timestamp(9), 9), (Timestamp(10), 10)] expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] with TestPipeline() as p: - ret = (p | PeriodicStream(data, interval=0.0001)) + ret = (p | PeriodicImpulse(data=data, fire_interval=0.0001)) assert_that(ret, equal_to(expected)) From 2eb4c903abf7146c68a0b30362d70c4f59288131 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 26 Jun 2025 16:04:36 -0400 Subject: [PATCH 3/8] Fix fn_runner_test that used PeriodicStream. --- .../runners/portability/fn_api_runner/fn_runner_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index f4d1b036a639..20e814b07269 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -74,7 +74,6 @@ from apache_beam.transforms import userstate from apache_beam.transforms import window from apache_beam.transforms.periodicsequence import PeriodicImpulse -from apache_beam.transforms.periodicsequence import PeriodicStream from apache_beam.utils import timestamp from apache_beam.utils import windowed_value @@ -1264,7 +1263,7 @@ def test_sliding_windows(self): with self.create_pipeline() as p: ret = ( p - | PeriodicStream(data, interval=1) + | PeriodicImpulse(data=data, fire_interval=1) | beam.WithKeys(0) | beam.WindowInto(beam.transforms.window.SlidingWindows(6, 3)) | beam.GroupByKey()) From 66f0d516e099cf9741d993e4648f3e14065a9fe0 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 26 Jun 2025 16:07:19 -0400 Subject: [PATCH 4/8] Reformat --- sdks/python/apache_beam/transforms/periodicsequence.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index 550629c6c2d4..a6b16065e439 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -216,7 +216,6 @@ class PeriodicImpulse(PTransform): but can be used as first transform in pipeline. The PCollection generated by PeriodicImpulse is unbounded. ''' - def _validate_and_adjust_duration(self): assert self.data From fd153a501fc547e96d88ef8568d534e499ff74e0 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 26 Jun 2025 22:02:06 -0400 Subject: [PATCH 5/8] Fix some edge cases and resolve floating point precision problem. Add tests. --- .../transforms/periodicsequence.py | 33 ++++---- .../transforms/periodicsequence_test.py | 76 ++++++++++++++++--- 2 files changed, 87 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index a6b16065e439..d5d620466675 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -21,7 +21,6 @@ from typing import Any from typing import Optional from typing import Sequence -from typing import Union import apache_beam as beam from apache_beam.io.restriction_trackers import OffsetRange @@ -41,13 +40,21 @@ class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider): def initial_restriction(self, element): start, end, interval = element if isinstance(start, Timestamp): - start = start.micros / 1000000 + start_micros = start.micros + else: + start_micros = round(start * 1000000) + if isinstance(end, Timestamp): - end = end.micros / 1000000 + end_micros = end.micros + else: + end_micros = round(end * 1000000) + + interval_micros = round(interval * 1000000) - assert start <= end + assert start_micros <= end_micros assert interval > 0 - total_outputs = math.ceil((end - start) / interval) + delta_micros: int = end_micros - start_micros + total_outputs = math.ceil(delta_micros / interval_micros) return OffsetRange(0, total_outputs) def create_tracker(self, restriction): @@ -232,11 +239,11 @@ def _validate_and_adjust_duration(self): if isinstance(self.stop_ts, Timestamp): if self.stop_ts == MAX_TIMESTAMP: - # adjust stop timestamp to match the data duration - end = start + data_duration - if self.interval > 1e-6: - end += 1e-6 - self.stop_ts = Timestamp.of(end) + # When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the + # data's actual end time plus an extra fire interval, because the + # impulse duration's upper bound is exclusive. + end = start + data_duration + self.interval + self.stop_ts = Timestamp(micros=end * 1000000) else: end = self.stop_ts.micros / 1000000 else: @@ -244,7 +251,7 @@ def _validate_and_adjust_duration(self): # The total time for the impulse signal which occurs in [start, end). impulse_duration = end - start - if data_duration + self.interval < impulse_duration: + if round(data_duration + self.interval, 6) < round(impulse_duration, 6): # We don't have enough data for the impulse. # If we can fit at least one more data point in the impulse duration, # then we will be in the repeat mode. @@ -264,8 +271,8 @@ def _validate_and_adjust_duration(self): def __init__( self, - start_timestamp: Union[Timestamp, float] = Timestamp.now(), - stop_timestamp: Union[Timestamp, float] = MAX_TIMESTAMP, + start_timestamp: Timestamp = Timestamp.now(), + stop_timestamp: Timestamp = MAX_TIMESTAMP, fire_interval: float = 360.0, apply_windowing: bool = False, data: Optional[Sequence[Any]] = None): diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index ffeb80c31f6f..e3478217016c 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -19,10 +19,14 @@ # pytype: skip-file +import logging import inspect +import random import time import unittest +from parameterized import parameterized + import apache_beam as beam from apache_beam.io.restriction_trackers import OffsetRange from apache_beam.testing.test_pipeline import TestPipeline @@ -157,6 +161,53 @@ def test_processing_time(self): expected = [0, 2, 4] assert_that(ret, equal_to(expected, lambda x, y: abs(x - y) < threshold)) + @parameterized.expand([0.5, 1, 2, 10]) + def test_stop_over_by_epsilon(self, interval): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=Timestamp(seconds=1), + stop_timestamp=Timestamp(seconds=1, micros=1), + data=[1, 2], + fire_interval=interval) + | beam.WindowInto(FixedWindows(interval)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [ + (0, [1]), + ] + assert_that(ret, equal_to(expected)) + + @parameterized.expand([1, 2]) + def test_stop_over_by_interval(self, interval): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=Timestamp(seconds=1), + stop_timestamp=Timestamp(seconds=1 + interval), + data=[1, 2], + fire_interval=interval) + | beam.WindowInto(FixedWindows(interval)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1])] + assert_that(ret, equal_to(expected)) + + @parameterized.expand([1, 2]) + def test_stop_over_by_interval_and_epsilon(self, interval): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=Timestamp(seconds=1), + stop_timestamp=Timestamp(seconds=1 + interval, micros=1), + data=[1, 2], + fire_interval=interval) + | beam.WindowInto(FixedWindows(interval)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1]), (0, [2])] + assert_that(ret, equal_to(expected)) + def test_interval(self): with TestPipeline() as p: ret = ( @@ -208,15 +259,22 @@ def test_not_enough_timestamped_value(self): data=data, fire_interval=0.5)) - def test_small_interval(self): - data = [(Timestamp(1), 1), (Timestamp(2), 2), (Timestamp(3), 3), - (Timestamp(6), 6), (Timestamp(4), 4), (Timestamp(5), 5), - (Timestamp(7), 7), (Timestamp(8), 8), (Timestamp(9), 9), - (Timestamp(10), 10)] - expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - with TestPipeline() as p: - ret = (p | PeriodicImpulse(data=data, fire_interval=0.0001)) - assert_that(ret, equal_to(expected)) + def test_fuzzy_interval(self): + seed = int(time.time() * 1000) + times = 30 + logging.warning("random seed=%d", seed) + random.seed(seed) + for _ in range(times): + n = int(random.randint(1, 100)) + data = list(range(n)) + m = random.randint(1, 1000) + interval = m / 1e6 + now = Timestamp.now() + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=now, data=data, fire_interval=interval)) + assert_that(ret, equal_to(data)) if __name__ == '__main__': From 523e1306861a507327a23e8023f07160fdc0521d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 27 Jun 2025 07:54:50 -0400 Subject: [PATCH 6/8] Fix lint and move tests for periodic impulse to its own test case. --- .../transforms/periodicsequence_test.py | 78 ++++++++++--------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index e3478217016c..fdf0995f8e5a 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -19,8 +19,8 @@ # pytype: skip-file -import logging import inspect +import logging import random import time import unittest @@ -65,7 +65,44 @@ def test_periodicsequence_outputs_valid_sequence(self): self.assertEqual(result.is_bounded, False) assert_that(result, equal_to(k)) - def test_periodicimpulse_windowing_on_si(self): + def test_periodicsequence_outputs_valid_sequence_in_past(self): + start_offset = -10000 + it = time.time() + start_offset + duration = 5 + et = it + duration + interval = 1 + + with TestPipeline() as p: + result = ( + p + | 'ImpulseElement' >> beam.Create([(it, et, interval)]) + | 'ImpulseSeqGen' >> PeriodicSequence()) + + k = [it + x * interval for x in range(0, int(duration / interval), 1)] + self.assertEqual(result.is_bounded, False) + assert_that(result, equal_to(k)) + + def test_periodicsequence_output_size(self): + element = [0, 1000000000, 10] + self.assertEqual( + _sequence_backlog_bytes(element, 100, OffsetRange(10, 100000000)), 0) + self.assertEqual( + _sequence_backlog_bytes(element, 100, OffsetRange(9, 100000000)), 8) + self.assertEqual( + _sequence_backlog_bytes(element, 100, OffsetRange(8, 100000000)), 16) + self.assertEqual( + _sequence_backlog_bytes(element, 101, OffsetRange(9, 100000000)), 8) + self.assertEqual( + _sequence_backlog_bytes(element, 10000, OffsetRange(0, 100000000)), + 8 * 10000 / 10) + self.assertEqual( + _sequence_backlog_bytes(element, 10000, OffsetRange(1002, 1003)), 0) + self.assertEqual( + _sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8) + + +class PeriodicImpulseTest(unittest.TestCase): + def test_windowing_on_si(self): start_offset = -15 it = time.time() + start_offset duration = 15 @@ -85,7 +122,7 @@ def test_periodicimpulse_windowing_on_si(self): for x in range(0, int(duration / interval), 1)] assert_that(actual, equal_to(k)) - def test_periodicimpulse_default_start(self): + def test_default_start(self): default_parameters = inspect.signature(PeriodicImpulse.__init__).parameters it = default_parameters["start_timestamp"].default duration = 1 @@ -105,41 +142,6 @@ def test_periodicimpulse_default_start(self): self.assertEqual(result.is_bounded, False) assert_that(result, equal_to(k)) - def test_periodicsequence_outputs_valid_sequence_in_past(self): - start_offset = -10000 - it = time.time() + start_offset - duration = 5 - et = it + duration - interval = 1 - - with TestPipeline() as p: - result = ( - p - | 'ImpulseElement' >> beam.Create([(it, et, interval)]) - | 'ImpulseSeqGen' >> PeriodicSequence()) - - k = [it + x * interval for x in range(0, int(duration / interval), 1)] - self.assertEqual(result.is_bounded, False) - assert_that(result, equal_to(k)) - - def test_periodicsequence_output_size(self): - element = [0, 1000000000, 10] - self.assertEqual( - _sequence_backlog_bytes(element, 100, OffsetRange(10, 100000000)), 0) - self.assertEqual( - _sequence_backlog_bytes(element, 100, OffsetRange(9, 100000000)), 8) - self.assertEqual( - _sequence_backlog_bytes(element, 100, OffsetRange(8, 100000000)), 16) - self.assertEqual( - _sequence_backlog_bytes(element, 101, OffsetRange(9, 100000000)), 8) - self.assertEqual( - _sequence_backlog_bytes(element, 10000, OffsetRange(0, 100000000)), - 8 * 10000 / 10) - self.assertEqual( - _sequence_backlog_bytes(element, 10000, OffsetRange(1002, 1003)), 0) - self.assertEqual( - _sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8) - @unittest.skip("hard to determine warm-up time and threshold for runners.") def test_processing_time(self): warmup_time = 3 From 109b2bb867ce4d14f38135cb6ce2440d8a55902e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 27 Jun 2025 08:50:03 -0400 Subject: [PATCH 7/8] Add more docstrings. Revise stop_timestamp part as it is an exclusive upper bound. --- .../transforms/periodicsequence.py | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index d5d620466675..49a49250ea71 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -89,8 +89,10 @@ class ImpulseSeqGenDoFn(beam.DoFn): ''' ImpulseSeqGenDoFn fn receives tuple elements with three parts: - * first_timestamp = first timestamp to output element for. - * last_timestamp = last timestamp/time to output element for. + * first_timestamp = The timestamp of the first element to be generated + (inclusive). + * last_timestamp = The timestamp marking the end of the generation period + (exclusive). No elements will be generated at or after this time. * fire_interval = how often to fire an element. For each input element received, ImpulseSeqGenDoFn fn will start @@ -110,11 +112,12 @@ class ImpulseSeqGenDoFn(beam.DoFn): timestamp. - **Non-Timestamped Data**: If `data` is a sequence of arbitrary values (e.g., `[v1, v2, ...]`), the DoFn will assign a timestamp to each - emitted element. The timestamps are calculated by starting at a given - `start_time` and incrementing by a fixed `interval`. + emitted element. - **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn will use the provided timestamp for the emitted element. + + See the parameter description of `PeriodicImpulse` for more information. ''' def __init__(self, data: Optional[Sequence[Any]] = None): self._data = data @@ -278,13 +281,33 @@ def __init__( data: Optional[Sequence[Any]] = None): ''' :param start_timestamp: Timestamp for first element. - :param stop_timestamp: Timestamp after which no elements will be output. + :param stop_timestamp: Timestamp at or after which no elements will be + output. :param fire_interval: Interval in seconds at which to output elements. :param apply_windowing: Whether each element should be assigned to individual window. If false, all elements will reside in global window. - :param data: The sequence of elements to emit into the PCollection. - The elements can be raw values or pre-timestamped tuples in the format - `(apache_beam.utils.timestamp.Timestamp, value)`. + :param data: A sequence of elements to emit. The behavior depends on the + content: + - **None (default):** The transform emits the event timestamps as + the element values, starting from start_timestamp and incrementing by + `fire_interval` up to the `stop_timestamp` (exclusive) + - **Sequence of raw values (e.g., `['a', 'b']`)**: The transform emits + each value in the sequence, assigning it an event timestamp that is + calculated in the same manner as the default scenario. The sequence + is repeated if the impulse duration requires more elements than + are in the sequence (a warning will be given in this case). + - **Sequence of pre-timestamped tuples (e.g., + `[(t1, v1), (t2, v2)]`)**: The transform emits each value with its + explicitly provided event time. The format must be + `(apache_beam.utils.timestamp.Timestamp, value)`. The provided + timestamps are used directly, overriding the calculated ones. + Note that the elements in the sequence is NOT required to be ordered + by event time; an element with a timestamp earlier than a preceding one + will be treated as a potential late event. + **Important**: In this mode, the number of elements in `data` must be + sufficient to cover the duration defined by `start_timestamp`, + `stop_timestamp`, and `fire_interval`; otherwise, a `ValueError` is + raised. ''' self.start_ts = start_timestamp self.stop_ts = stop_timestamp From 2c3cc51d0af0055d403c0f1d7ea5a6d90b45f5b7 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 27 Jun 2025 11:52:04 -0400 Subject: [PATCH 8/8] Fix pydoc error. --- sdks/python/apache_beam/transforms/periodicsequence.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index 49a49250ea71..daab9d42387b 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -288,6 +288,7 @@ def __init__( individual window. If false, all elements will reside in global window. :param data: A sequence of elements to emit. The behavior depends on the content: + - **None (default):** The transform emits the event timestamps as the element values, starting from start_timestamp and incrementing by `fire_interval` up to the `stop_timestamp` (exclusive)