From 2dedf60c0e40d99f8505f747db63e7d3d4989da6 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Jun 2025 23:43:12 -0400 Subject: [PATCH 1/3] Add PeriodicStream in the new time series folder. --- sdks/python/apache_beam/ml/ts/__init__.py | 16 +++ sdks/python/apache_beam/ml/ts/util.py | 145 +++++++++++++++++++++ sdks/python/apache_beam/ml/ts/util_test.py | 82 ++++++++++++ 3 files changed, 243 insertions(+) create mode 100644 sdks/python/apache_beam/ml/ts/__init__.py create mode 100644 sdks/python/apache_beam/ml/ts/util.py create mode 100644 sdks/python/apache_beam/ml/ts/util_test.py diff --git a/sdks/python/apache_beam/ml/ts/__init__.py b/sdks/python/apache_beam/ml/ts/__init__.py new file mode 100644 index 000000000000..ecb1860df848 --- /dev/null +++ b/sdks/python/apache_beam/ml/ts/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# \ No newline at end of file diff --git a/sdks/python/apache_beam/ml/ts/util.py b/sdks/python/apache_beam/ml/ts/util.py new file mode 100644 index 000000000000..c4767089acd2 --- /dev/null +++ b/sdks/python/apache_beam/ml/ts/util.py @@ -0,0 +1,145 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import math +import time +from typing import Any +from typing import Optional +from typing import Sequence + +import apache_beam as beam +from apache_beam.io.watermark_estimators import ManualWatermarkEstimator +from apache_beam.runners import sdf_utils +from apache_beam.transforms.periodicsequence import ImpulseSeqGenRestrictionProvider # pylint:disable=line-too-long +from apache_beam.transforms.window import TimestampedValue +from apache_beam.utils import timestamp + + +class ImpulseStreamGenDoFn(beam.DoFn): + ''' + Generates a periodic, unbounded stream of elements from a provided sequence. + + (Similar to ImpulseSeqGenDoFn in apache_beam.transforms.periodicsequence) + + This Splittable DoFn (SDF) is designed to simulate a continuous stream of + data for testing or demonstration purposes. It takes a Python sequence (e.g., + a list) and emits its elements one by one in a loop, assigning a timestamp + to each. + + The DoFn operates in two modes based on the structure of the input `data`: + 1. **Non-Timestamped Data**: If `data` is a sequence of arbitrary values + (e.g., `[v1, v2, ...]`), the DoFn will assign a new timestamp to each + emitted element. The timestamps are calculated by starting at a given + `start_time` and incrementing by a fixed `interval`. + 2. **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each + tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn + will use the provided timestamp for the emitted element. + + The rate of emission is controlled by wall-clock time. The DoFn will only + emit elements whose timestamp (either calculated or provided) is in the past + compared to the current system time. When it "catches up" to the present, + it will pause and defer the remainder of the work. + ''' + def __init__(self, data: Sequence[Any]): + self._data = data + self._len = len(data) + self._is_timestamped_value = len(data) > 0 and isinstance( + data[0], tuple) and isinstance(data[0][0], timestamp.Timestamp) + + def _get_timestamped_value(self, index, current_output_timestamp): + if self._is_timestamped_value: + event_time, value = self._data[index % self._len] + return TimestampedValue(value, event_time) + else: + value = self._data[index % self._len] + return TimestampedValue(value, current_output_timestamp) + + @beam.DoFn.unbounded_per_element() + def process( + self, + element, + restriction_tracker=beam.DoFn.RestrictionParam( + ImpulseSeqGenRestrictionProvider()), + watermark_estimator=beam.DoFn.WatermarkEstimatorParam( + ManualWatermarkEstimator.default_provider())): + start, _, interval = element + + if isinstance(start, timestamp.Timestamp): + start = start.micros / 1000000 + + assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView) + + current_output_index = restriction_tracker.current_restriction().start + + while True: + current_output_timestamp = start + interval * current_output_index + + if current_output_timestamp > time.time(): + # we are too ahead of time, let's wait. + restriction_tracker.defer_remainder( + timestamp.Timestamp(current_output_timestamp)) + return + + if not restriction_tracker.try_claim(current_output_index): + # nothing to claim, just stop + return + + output = self._get_timestamped_value( + current_output_index, current_output_timestamp) + + current_watermark = watermark_estimator.current_watermark() + if current_watermark is None or output.timestamp > current_watermark: + # ensure watermark is monotonic + watermark_estimator.set_watermark(output.timestamp) + + yield output + + current_output_index += 1 + + +class PeriodicStream(beam.PTransform): + WARMUP_TIME = 2 + + def __init__( + self, + data: Sequence[Any], + max_duration: Optional[float] = None, + interval: float = 0.1, + repeat: bool = False): + self._data = data + self._interval = interval + self._repeat = repeat + self._duration = len(self._data) * interval + self._max_duration = max_duration if max_duration is not None else float( + "inf") + + def expand(self, pbegin): + # Give the runner some time to start up so the events will not cluster + # at the beginning. + start = timestamp.Timestamp.now() + PeriodicStream.WARMUP_TIME + + if not self._repeat: + stop = start + min(self._duration, self._max_duration) + else: + stop = timestamp.MAX_TIMESTAMP if math.isinf( + self._max_duration) else start + self._max_duration + + result = ( + pbegin + | 'ImpulseElement' >> beam.Create([(start, stop, self._interval)]) + | 'GenStream' >> beam.ParDo(ImpulseStreamGenDoFn(self._data))) + return result diff --git a/sdks/python/apache_beam/ml/ts/util_test.py b/sdks/python/apache_beam/ml/ts/util_test.py new file mode 100644 index 000000000000..298d70192209 --- /dev/null +++ b/sdks/python/apache_beam/ml/ts/util_test.py @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import unittest + +import apache_beam as beam +from apache_beam.ml.ts.util import PeriodicStream +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.window import FixedWindows +from apache_beam.utils.timestamp import Timestamp + + +class PeriodicStreamTest(unittest.TestCase): + def test_duration(self): + options = PipelineOptions() + start = Timestamp.now() + with beam.Pipeline(options=options) as p: + ret = ( + p | PeriodicStream([1, 2, 3, 4], interval=0.5) + | beam.WindowInto(FixedWindows(0.5)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4])] + assert_that(ret, equal_to(expected)) + end = Timestamp.now() + self.assertGreaterEqual(end - start, 3) + self.assertLessEqual(end - start, 7) + + def test_repeat(self): + options = PipelineOptions() + start = Timestamp.now() + with beam.Pipeline(options=options) as p: + ret = ( + p | PeriodicStream( + [1, 2, 3, 4], interval=0.5, max_duration=3, repeat=True) + | beam.WindowInto(FixedWindows(0.5)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4]), (0, [1]), (0, [2])] + assert_that(ret, equal_to(expected)) + end = Timestamp.now() + self.assertGreaterEqual(end - start, 3) + self.assertLessEqual(end - start, 7) + + def test_timestamped_value(self): + options = PipelineOptions() + start = Timestamp.now() + with beam.Pipeline(options=options) as p: + ret = ( + p | PeriodicStream([(Timestamp(1), 1), (Timestamp(3), 2), + (Timestamp(2), 3), (Timestamp(1), 4)], + interval=0.5) + | beam.WindowInto(FixedWindows(0.5)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1, 4]), (0, [2]), (0, [3])] + assert_that(ret, equal_to(expected)) + end = Timestamp.now() + self.assertGreaterEqual(end - start, 3) + self.assertLessEqual(end - start, 7) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.WARNING) + unittest.main() From 08245519d0dec182a658c1f2cc53910f8967c980 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 15 Jun 2025 23:23:34 -0400 Subject: [PATCH 2/3] Add some more docstrings and minor fix on test name. --- sdks/python/apache_beam/ml/ts/util.py | 39 ++++++++++++++++++++-- sdks/python/apache_beam/ml/ts/util_test.py | 2 +- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/ts/util.py b/sdks/python/apache_beam/ml/ts/util.py index c4767089acd2..26f7d43a6610 100644 --- a/sdks/python/apache_beam/ml/ts/util.py +++ b/sdks/python/apache_beam/ml/ts/util.py @@ -30,7 +30,7 @@ class ImpulseStreamGenDoFn(beam.DoFn): - ''' + """ Generates a periodic, unbounded stream of elements from a provided sequence. (Similar to ImpulseSeqGenDoFn in apache_beam.transforms.periodicsequence) @@ -53,7 +53,12 @@ class ImpulseStreamGenDoFn(beam.DoFn): emit elements whose timestamp (either calculated or provided) is in the past compared to the current system time. When it "catches up" to the present, it will pause and defer the remainder of the work. - ''' + + Args: + data: The sequence of elements to emit into the PCollection. The elements + can be raw values or pre-timestamped tuples in the format + `(apache_beam.utils.timestamp.Timestamp, value)`. + """ def __init__(self, data: Sequence[Any]): self._data = data self._len = len(data) @@ -112,6 +117,36 @@ def process( class PeriodicStream(beam.PTransform): + """A PTransform that generates a periodic stream of elements from a sequence. + + This transform creates a `PCollection` by emitting elements from a provided + Python sequence at a specified time interval. It is designed for use in + streaming pipelines to simulate a live, continuous source of data. + + The transform can be configured to: + - Emit the sequence only once. + - Repeat the sequence indefinitely or for a maximum duration. + - Control the time interval between elements. + + To ensure that the stream does not emit a burst of elements immediately at + pipeline startup, a fixed warmup period is added before the first element + is generated. + + Args: + data: The sequence of elements to emit into the PCollection. The elements + can be raw values or pre-timestamped tuples in the format + `(apache_beam.utils.timestamp.Timestamp, value)`. + max_duration: The maximum total duration in seconds for the stream + generation. If `None` (the default) and `repeat` is `True`, the + stream is effectively infinite. If `repeat` is `False`, the stream's + duration is the shorter of this value and the time required to emit + the sequence once. + interval: The delay in seconds between consecutive elements. + Defaults to 0.1. + repeat: If `True`, the input `data` sequence is emitted repeatedly. + If `False` (the default), the sequence is emitted only once. + """ + WARMUP_TIME = 2 def __init__( diff --git a/sdks/python/apache_beam/ml/ts/util_test.py b/sdks/python/apache_beam/ml/ts/util_test.py index 298d70192209..ac2bc6ea701f 100644 --- a/sdks/python/apache_beam/ml/ts/util_test.py +++ b/sdks/python/apache_beam/ml/ts/util_test.py @@ -28,7 +28,7 @@ class PeriodicStreamTest(unittest.TestCase): - def test_duration(self): + def test_interval(self): options = PipelineOptions() start = Timestamp.now() with beam.Pipeline(options=options) as p: From ef64fbee94742f92d67efa24c85255eb46fc3053 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 15 Jun 2025 23:53:32 -0400 Subject: [PATCH 3/3] Fix lints and docs. --- sdks/python/apache_beam/ml/ts/__init__.py | 2 +- sdks/python/apache_beam/ml/ts/util.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/ts/__init__.py b/sdks/python/apache_beam/ml/ts/__init__.py index ecb1860df848..cce3acad34a4 100644 --- a/sdks/python/apache_beam/ml/ts/__init__.py +++ b/sdks/python/apache_beam/ml/ts/__init__.py @@ -13,4 +13,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# \ No newline at end of file +# diff --git a/sdks/python/apache_beam/ml/ts/util.py b/sdks/python/apache_beam/ml/ts/util.py index 26f7d43a6610..4005f57e0047 100644 --- a/sdks/python/apache_beam/ml/ts/util.py +++ b/sdks/python/apache_beam/ml/ts/util.py @@ -41,11 +41,12 @@ class ImpulseStreamGenDoFn(beam.DoFn): to each. The DoFn operates in two modes based on the structure of the input `data`: - 1. **Non-Timestamped Data**: If `data` is a sequence of arbitrary values + + - **Non-Timestamped Data**: If `data` is a sequence of arbitrary values (e.g., `[v1, v2, ...]`), the DoFn will assign a new timestamp to each emitted element. The timestamps are calculated by starting at a given `start_time` and incrementing by a fixed `interval`. - 2. **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each + - **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn will use the provided timestamp for the emitted element.