diff --git a/sdks/python/apache_beam/ml/ts/util.py b/sdks/python/apache_beam/ml/ts/util.py deleted file mode 100644 index a50f2667621b..000000000000 --- a/sdks/python/apache_beam/ml/ts/util.py +++ /dev/null @@ -1,193 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import math -import time -from typing import Any -from typing import Optional -from typing import Sequence - -import apache_beam as beam -from apache_beam.io.watermark_estimators import ManualWatermarkEstimator -from apache_beam.runners import sdf_utils -from apache_beam.transforms.periodicsequence import ImpulseSeqGenRestrictionProvider # pylint:disable=line-too-long -from apache_beam.transforms.window import TimestampedValue -from apache_beam.utils import timestamp - - -class ImpulseStreamGenDoFn(beam.DoFn): - """ - Generates a periodic, unbounded stream of elements from a provided sequence. - - (Similar to ImpulseSeqGenDoFn in apache_beam.transforms.periodicsequence) - - This Splittable DoFn (SDF) is designed to simulate a continuous stream of - data for testing or demonstration purposes. It takes a Python sequence (e.g., - a list) and emits its elements one by one in a loop, assigning a timestamp - to each. - - The DoFn operates in two modes based on the structure of the input `data`: - - - **Non-Timestamped Data**: If `data` is a sequence of arbitrary values - (e.g., `[v1, v2, ...]`), the DoFn will assign a new timestamp to each - emitted element. The timestamps are calculated by starting at a given - `start_time` and incrementing by a fixed `interval`. - - **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each - tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn - will use the provided timestamp for the emitted element. - - The rate of emission is controlled by wall-clock time. The DoFn will only - emit elements whose timestamp (either calculated or provided) is in the past - compared to the current system time. When it "catches up" to the present, - it will pause and defer the remainder of the work. - - Args: - data: The sequence of elements to emit into the PCollection. The elements - can be raw values or pre-timestamped tuples in the format - `(apache_beam.utils.timestamp.Timestamp, value)`. - """ - def __init__(self, data: Sequence[Any]): - self._data = data - self._len = len(data) - self._is_timestamped_value = len(data) > 0 and isinstance( - data[0], tuple) and isinstance(data[0][0], timestamp.Timestamp) - - def _get_timestamped_value(self, index, current_output_timestamp): - if self._is_timestamped_value: - event_time, value = self._data[index % self._len] - return TimestampedValue(value, event_time) - else: - value = self._data[index % self._len] - return TimestampedValue(value, current_output_timestamp) - - @beam.DoFn.unbounded_per_element() - def process( - self, - element, - restriction_tracker=beam.DoFn.RestrictionParam( - ImpulseSeqGenRestrictionProvider()), - watermark_estimator=beam.DoFn.WatermarkEstimatorParam( - ManualWatermarkEstimator.default_provider())): - start, _, interval = element - - if isinstance(start, timestamp.Timestamp): - start = start.micros / 1000000 - - assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView) - - current_output_index = restriction_tracker.current_restriction().start - - while True: - current_output_timestamp = start + interval * current_output_index - - if current_output_timestamp > time.time(): - # we are too ahead of time, let's wait. - restriction_tracker.defer_remainder( - timestamp.Timestamp(current_output_timestamp)) - return - - if not restriction_tracker.try_claim(current_output_index): - # nothing to claim, just stop - return - - output = self._get_timestamped_value( - current_output_index, current_output_timestamp) - - current_watermark = watermark_estimator.current_watermark() - if current_watermark is None or output.timestamp > current_watermark: - # ensure watermark is monotonic - watermark_estimator.set_watermark(output.timestamp) - - yield output - - current_output_index += 1 - - -class PeriodicStream(beam.PTransform): - """A PTransform that generates a periodic stream of elements from a sequence. - - This transform creates a `PCollection` by emitting elements from a provided - Python sequence at a specified time interval. It is designed for use in - streaming pipelines to simulate a live, continuous source of data. - - The transform can be configured to: - - Emit the sequence only once. - - Repeat the sequence indefinitely or for a maximum duration. - - Control the time interval between elements. - - To ensure that the stream does not emit a burst of elements immediately at - pipeline startup, a fixed warmup period is added before the first element - is generated. - - Args: - data: The sequence of elements to emit into the PCollection. The elements - can be raw values or pre-timestamped tuples in the format - `(apache_beam.utils.timestamp.Timestamp, value)`. - max_duration: The maximum total duration in seconds for the stream - generation. If `None` (the default) and `repeat` is `True`, the - stream is effectively infinite. If `repeat` is `False`, the stream's - duration is the shorter of this value and the time required to emit - the sequence once. - interval: The delay in seconds between consecutive elements. - Defaults to 0.1. - repeat: If `True`, the input `data` sequence is emitted repeatedly. - If `False` (the default), the sequence is emitted only once. - """ - - WARMUP_TIME = 2 - - def __init__( - self, - data: Sequence[Any], - max_duration: Optional[float] = None, - interval: float = 0.1, - repeat: bool = False): - self._data = data - self._interval = interval - self._repeat = repeat - - # In `ImpulseSeqGenRestrictionProvider`, the total number of counts - # (i.e. total_outputs) is computed by ceil((end - start) / interval), - # where end is start + duration. - # Due to precision error of arithmetic operations, even if duration is set - # to len(self._data) * interval, (end - start) / interval could be a little - # bit smaller or bigger than len(self._data). - # In case of being bigger, total_outputs would be len(self._data) + 1, - # as the ceil() operation is used. - # Assuming that the precision error is no bigger than 1%, by subtracting - # a small amount, we ensure that the result after ceil is stable even if - # the precision error is present. - self._duration = len(self._data) * interval - 0.01 * interval - self._max_duration = max_duration if max_duration is not None else float( - "inf") - - def expand(self, pbegin): - # Give the runner some time to start up so the events will not cluster - # at the beginning. - start = timestamp.Timestamp.now() + PeriodicStream.WARMUP_TIME - - if not self._repeat: - stop = start + min(self._duration, self._max_duration) - else: - stop = timestamp.MAX_TIMESTAMP if math.isinf( - self._max_duration) else start + self._max_duration - - result = ( - pbegin - | 'ImpulseElement' >> beam.Create([(start, stop, self._interval)]) - | 'GenStream' >> beam.ParDo(ImpulseStreamGenDoFn(self._data))) - return result diff --git a/sdks/python/apache_beam/ml/ts/util_test.py b/sdks/python/apache_beam/ml/ts/util_test.py deleted file mode 100644 index 5a2a8a79ce89..000000000000 --- a/sdks/python/apache_beam/ml/ts/util_test.py +++ /dev/null @@ -1,93 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import logging -import unittest - -import apache_beam as beam -from apache_beam.ml.ts.util import PeriodicStream -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to -from apache_beam.transforms.window import FixedWindows -from apache_beam.utils.timestamp import Timestamp - - -class PeriodicStreamTest(unittest.TestCase): - def test_interval(self): - options = PipelineOptions() - start = Timestamp.now() - with beam.Pipeline(options=options) as p: - ret = ( - p | PeriodicStream([1, 2, 3, 4], interval=0.5) - | beam.WindowInto(FixedWindows(0.5)) - | beam.WithKeys(0) - | beam.GroupByKey()) - expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4])] - assert_that(ret, equal_to(expected)) - end = Timestamp.now() - self.assertGreaterEqual(end - start, 3) - self.assertLessEqual(end - start, 7) - - def test_repeat(self): - options = PipelineOptions() - start = Timestamp.now() - with beam.Pipeline(options=options) as p: - ret = ( - p | PeriodicStream( - [1, 2, 3, 4], interval=0.5, max_duration=3, repeat=True) - | beam.WindowInto(FixedWindows(0.5)) - | beam.WithKeys(0) - | beam.GroupByKey()) - expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4]), (0, [1]), (0, [2])] - assert_that(ret, equal_to(expected)) - end = Timestamp.now() - self.assertGreaterEqual(end - start, 3) - self.assertLessEqual(end - start, 7) - - def test_timestamped_value(self): - options = PipelineOptions() - start = Timestamp.now() - with beam.Pipeline(options=options) as p: - ret = ( - p | PeriodicStream([(Timestamp(1), 1), (Timestamp(3), 2), - (Timestamp(2), 3), (Timestamp(1), 4)], - interval=0.5) - | beam.WindowInto(FixedWindows(0.5)) - | beam.WithKeys(0) - | beam.GroupByKey()) - expected = [(0, [1, 4]), (0, [2]), (0, [3])] - assert_that(ret, equal_to(expected)) - end = Timestamp.now() - self.assertGreaterEqual(end - start, 3) - self.assertLessEqual(end - start, 7) - - def test_stable_output(self): - options = PipelineOptions() - data = [(Timestamp(1), 1), (Timestamp(2), 2), (Timestamp(3), 3), - (Timestamp(6), 6), (Timestamp(4), 4), (Timestamp(5), 5), - (Timestamp(7), 7), (Timestamp(8), 8), (Timestamp(9), 9), - (Timestamp(10), 10)] - expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - with beam.Pipeline(options=options) as p: - ret = (p | PeriodicStream(data, interval=0.0001)) - assert_that(ret, equal_to(expected)) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.WARNING) - unittest.main() diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 2552d592b8d5..3442b5746817 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -53,7 +53,6 @@ from apache_beam.metrics import monitoring_infos from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.metricbase import MetricName -from apache_beam.ml.ts.util import PeriodicStream from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import DirectOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -1264,7 +1263,7 @@ def test_sliding_windows(self): with self.create_pipeline() as p: ret = ( p - | PeriodicStream(data, interval=1) + | PeriodicImpulse(data=data, fire_interval=1) | beam.WithKeys(0) | beam.WindowInto(beam.transforms.window.SlidingWindows(6, 3)) | beam.GroupByKey()) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index d06651a9504b..daab9d42387b 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -17,6 +17,10 @@ import math import time +import warnings +from typing import Any +from typing import Optional +from typing import Sequence import apache_beam as beam from apache_beam.io.restriction_trackers import OffsetRange @@ -36,13 +40,21 @@ class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider): def initial_restriction(self, element): start, end, interval = element if isinstance(start, Timestamp): - start = start.micros / 1000000 + start_micros = start.micros + else: + start_micros = round(start * 1000000) + if isinstance(end, Timestamp): - end = end.micros / 1000000 + end_micros = end.micros + else: + end_micros = round(end * 1000000) - assert start <= end + interval_micros = round(interval * 1000000) + + assert start_micros <= end_micros assert interval > 0 - total_outputs = math.ceil((end - start) / interval) + delta_micros: int = end_micros - start_micros + total_outputs = math.ceil(delta_micros / interval_micros) return OffsetRange(0, total_outputs) def create_tracker(self, restriction): @@ -77,8 +89,10 @@ class ImpulseSeqGenDoFn(beam.DoFn): ''' ImpulseSeqGenDoFn fn receives tuple elements with three parts: - * first_timestamp = first timestamp to output element for. - * last_timestamp = last timestamp/time to output element for. + * first_timestamp = The timestamp of the first element to be generated + (inclusive). + * last_timestamp = The timestamp marking the end of the generation period + (exclusive). No elements will be generated at or after this time. * fire_interval = how often to fire an element. For each input element received, ImpulseSeqGenDoFn fn will start @@ -91,7 +105,40 @@ class ImpulseSeqGenDoFn(beam.DoFn): ImpulseSeqGenDoFn can't guarantee that each element is output at exact time. ImpulseSeqGenDoFn guarantees that elements would not be output prior to given runtime timestamp. + + The output mode of the DoFn is based on the input `data`: + + - **None**: If `data` is None (by default), the output element will be the + timestamp. + - **Non-Timestamped Data**: If `data` is a sequence of arbitrary values + (e.g., `[v1, v2, ...]`), the DoFn will assign a timestamp to each + emitted element. + - **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each + tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn + will use the provided timestamp for the emitted element. + + See the parameter description of `PeriodicImpulse` for more information. ''' + def __init__(self, data: Optional[Sequence[Any]] = None): + self._data = data + assert self._data is None or len(self._data) > 0 + self._len = len(self._data) if self._data is not None else 0 + self._is_pre_timestamped = self._data is not None and self._len > 0 and \ + isinstance(self._data[0], tuple) and \ + isinstance(self._data[0][0], timestamp.Timestamp) + + def _get_output(self, index, current_output_timestamp): + if self._data is None: + return TimestampedValue( + current_output_timestamp, current_output_timestamp) + + if self._is_pre_timestamped: + event_time, value = self._data[index % self._len] + return TimestampedValue(value, event_time) + else: + value = self._data[index % self._len] + return TimestampedValue(value, current_output_timestamp) + @beam.DoFn.unbounded_per_element() def process( self, @@ -114,24 +161,30 @@ def process( assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView) current_output_index = restriction_tracker.current_restriction().start - current_output_timestamp = start + interval * current_output_index - current_time = time.time() - watermark_estimator.set_watermark( - timestamp.Timestamp(current_output_timestamp)) - - while current_output_timestamp <= current_time: - if restriction_tracker.try_claim(current_output_index): - yield current_output_timestamp - current_output_index += 1 - current_output_timestamp = start + interval * current_output_index - current_time = time.time() - watermark_estimator.set_watermark( + + while True: + current_output_timestamp = start + interval * current_output_index + + if current_output_timestamp > time.time(): + # we are too ahead of time, let's wait. + restriction_tracker.defer_remainder( timestamp.Timestamp(current_output_timestamp)) - else: return - restriction_tracker.defer_remainder( - timestamp.Timestamp(current_output_timestamp)) + if not restriction_tracker.try_claim(current_output_index): + # nothing to claim, just stop + return + + output = self._get_output(current_output_index, current_output_timestamp) + + current_watermark = watermark_estimator.current_watermark() + if current_watermark is None or output.timestamp > current_watermark: + # ensure watermark is monotonic + watermark_estimator.set_watermark(output.timestamp) + + yield output + + current_output_index += 1 class PeriodicSequence(PTransform): @@ -173,31 +226,113 @@ class PeriodicImpulse(PTransform): but can be used as first transform in pipeline. The PCollection generated by PeriodicImpulse is unbounded. ''' + def _validate_and_adjust_duration(self): + assert self.data + + # The total time we need to impulse all the data. + data_duration = (len(self.data) - 1) * self.interval + + is_pre_timestamped = isinstance(self.data[0], tuple) and \ + isinstance(self.data[0][0], timestamp.Timestamp) + + if isinstance(self.start_ts, Timestamp): + start = self.start_ts.micros / 1000000 + else: + start = self.start_ts + + if isinstance(self.stop_ts, Timestamp): + if self.stop_ts == MAX_TIMESTAMP: + # When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the + # data's actual end time plus an extra fire interval, because the + # impulse duration's upper bound is exclusive. + end = start + data_duration + self.interval + self.stop_ts = Timestamp(micros=end * 1000000) + else: + end = self.stop_ts.micros / 1000000 + else: + end = self.stop_ts + + # The total time for the impulse signal which occurs in [start, end). + impulse_duration = end - start + if round(data_duration + self.interval, 6) < round(impulse_duration, 6): + # We don't have enough data for the impulse. + # If we can fit at least one more data point in the impulse duration, + # then we will be in the repeat mode. + message = 'The number of elements in the provided pre-timestamped ' \ + 'data sequence is not enough to span the full impulse duration. ' \ + f'Expected duration: {impulse_duration:.6f}, ' \ + f'actual data duration: {data_duration:.6f}.' + + if is_pre_timestamped: + raise ValueError( + f'{message} Please either provide more data or decrease ' + '`stop_timestamp`.') + else: + warnings.warn( + f'{message} As a result, the data sequence will be repeated to ' + 'generate elements for the entire duration.') + def __init__( self, - start_timestamp=Timestamp.now(), - stop_timestamp=MAX_TIMESTAMP, - fire_interval=360.0, - apply_windowing=False): + start_timestamp: Timestamp = Timestamp.now(), + stop_timestamp: Timestamp = MAX_TIMESTAMP, + fire_interval: float = 360.0, + apply_windowing: bool = False, + data: Optional[Sequence[Any]] = None): ''' :param start_timestamp: Timestamp for first element. - :param stop_timestamp: Timestamp after which no elements will be output. + :param stop_timestamp: Timestamp at or after which no elements will be + output. :param fire_interval: Interval in seconds at which to output elements. :param apply_windowing: Whether each element should be assigned to individual window. If false, all elements will reside in global window. + :param data: A sequence of elements to emit. The behavior depends on the + content: + + - **None (default):** The transform emits the event timestamps as + the element values, starting from start_timestamp and incrementing by + `fire_interval` up to the `stop_timestamp` (exclusive) + - **Sequence of raw values (e.g., `['a', 'b']`)**: The transform emits + each value in the sequence, assigning it an event timestamp that is + calculated in the same manner as the default scenario. The sequence + is repeated if the impulse duration requires more elements than + are in the sequence (a warning will be given in this case). + - **Sequence of pre-timestamped tuples (e.g., + `[(t1, v1), (t2, v2)]`)**: The transform emits each value with its + explicitly provided event time. The format must be + `(apache_beam.utils.timestamp.Timestamp, value)`. The provided + timestamps are used directly, overriding the calculated ones. + Note that the elements in the sequence is NOT required to be ordered + by event time; an element with a timestamp earlier than a preceding one + will be treated as a potential late event. + **Important**: In this mode, the number of elements in `data` must be + sufficient to cover the duration defined by `start_timestamp`, + `stop_timestamp`, and `fire_interval`; otherwise, a `ValueError` is + raised. ''' self.start_ts = start_timestamp self.stop_ts = stop_timestamp self.interval = fire_interval self.apply_windowing = apply_windowing + self.data = data + + if self.data: + self._validate_and_adjust_duration() def expand(self, pbegin): result = ( pbegin | 'ImpulseElement' >> beam.Create( [(self.start_ts, self.stop_ts, self.interval)]) - | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn()) - | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt))) + | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn(self.data))) + + if not self.data: + # This step is only to ensure the current PTransform expansion is + # compatible with the previous Beam versions. + result = ( + result + | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt))) + if self.apply_windowing: result = result | 'ApplyWindowing' >> beam.WindowInto( window.FixedWindows(self.interval)) diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index 221520c94622..fdf0995f8e5a 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -20,17 +20,25 @@ # pytype: skip-file import inspect +import logging +import random import time import unittest +from parameterized import parameterized + import apache_beam as beam from apache_beam.io.restriction_trackers import OffsetRange from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms import trigger +from apache_beam.transforms import window from apache_beam.transforms.periodicsequence import PeriodicImpulse from apache_beam.transforms.periodicsequence import PeriodicSequence from apache_beam.transforms.periodicsequence import _sequence_backlog_bytes +from apache_beam.transforms.window import FixedWindows +from apache_beam.utils.timestamp import Timestamp # Disable frequent lint warning due to pipe operator for chaining transforms. # pylint: disable=expression-not-assigned @@ -57,7 +65,44 @@ def test_periodicsequence_outputs_valid_sequence(self): self.assertEqual(result.is_bounded, False) assert_that(result, equal_to(k)) - def test_periodicimpulse_windowing_on_si(self): + def test_periodicsequence_outputs_valid_sequence_in_past(self): + start_offset = -10000 + it = time.time() + start_offset + duration = 5 + et = it + duration + interval = 1 + + with TestPipeline() as p: + result = ( + p + | 'ImpulseElement' >> beam.Create([(it, et, interval)]) + | 'ImpulseSeqGen' >> PeriodicSequence()) + + k = [it + x * interval for x in range(0, int(duration / interval), 1)] + self.assertEqual(result.is_bounded, False) + assert_that(result, equal_to(k)) + + def test_periodicsequence_output_size(self): + element = [0, 1000000000, 10] + self.assertEqual( + _sequence_backlog_bytes(element, 100, OffsetRange(10, 100000000)), 0) + self.assertEqual( + _sequence_backlog_bytes(element, 100, OffsetRange(9, 100000000)), 8) + self.assertEqual( + _sequence_backlog_bytes(element, 100, OffsetRange(8, 100000000)), 16) + self.assertEqual( + _sequence_backlog_bytes(element, 101, OffsetRange(9, 100000000)), 8) + self.assertEqual( + _sequence_backlog_bytes(element, 10000, OffsetRange(0, 100000000)), + 8 * 10000 / 10) + self.assertEqual( + _sequence_backlog_bytes(element, 10000, OffsetRange(1002, 1003)), 0) + self.assertEqual( + _sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8) + + +class PeriodicImpulseTest(unittest.TestCase): + def test_windowing_on_si(self): start_offset = -15 it = time.time() + start_offset duration = 15 @@ -77,7 +122,7 @@ def test_periodicimpulse_windowing_on_si(self): for x in range(0, int(duration / interval), 1)] assert_that(actual, equal_to(k)) - def test_periodicimpulse_default_start(self): + def test_default_start(self): default_parameters = inspect.signature(PeriodicImpulse.__init__).parameters it = default_parameters["start_timestamp"].default duration = 1 @@ -97,40 +142,141 @@ def test_periodicimpulse_default_start(self): self.assertEqual(result.is_bounded, False) assert_that(result, equal_to(k)) - def test_periodicsequence_outputs_valid_sequence_in_past(self): - start_offset = -10000 - it = time.time() + start_offset - duration = 5 - et = it + duration - interval = 1 + @unittest.skip("hard to determine warm-up time and threshold for runners.") + def test_processing_time(self): + warmup_time = 3 + threshold = 0.5 + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=Timestamp.now() + warmup_time, + data=[10, 20, 30], + fire_interval=2) + | beam.Map(lambda _: time.time()) + | beam.WindowInto( + window.GlobalWindows(), + trigger=trigger.Repeatedly(trigger.AfterCount(3)), + accumulation_mode=trigger.AccumulationMode.DISCARDING, + ) + | beam.GroupBy() + | beam.FlatMap(lambda x: [v - min(x[1]) for v in x[1]])) + expected = [0, 2, 4] + assert_that(ret, equal_to(expected, lambda x, y: abs(x - y) < threshold)) + @parameterized.expand([0.5, 1, 2, 10]) + def test_stop_over_by_epsilon(self, interval): with TestPipeline() as p: - result = ( - p - | 'ImpulseElement' >> beam.Create([(it, et, interval)]) - | 'ImpulseSeqGen' >> PeriodicSequence()) + ret = ( + p | PeriodicImpulse( + start_timestamp=Timestamp(seconds=1), + stop_timestamp=Timestamp(seconds=1, micros=1), + data=[1, 2], + fire_interval=interval) + | beam.WindowInto(FixedWindows(interval)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [ + (0, [1]), + ] + assert_that(ret, equal_to(expected)) - k = [it + x * interval for x in range(0, int(duration / interval), 1)] - self.assertEqual(result.is_bounded, False) - assert_that(result, equal_to(k)) + @parameterized.expand([1, 2]) + def test_stop_over_by_interval(self, interval): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=Timestamp(seconds=1), + stop_timestamp=Timestamp(seconds=1 + interval), + data=[1, 2], + fire_interval=interval) + | beam.WindowInto(FixedWindows(interval)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1])] + assert_that(ret, equal_to(expected)) - def test_periodicsequence_output_size(self): - element = [0, 1000000000, 10] - self.assertEqual( - _sequence_backlog_bytes(element, 100, OffsetRange(10, 100000000)), 0) - self.assertEqual( - _sequence_backlog_bytes(element, 100, OffsetRange(9, 100000000)), 8) - self.assertEqual( - _sequence_backlog_bytes(element, 100, OffsetRange(8, 100000000)), 16) - self.assertEqual( - _sequence_backlog_bytes(element, 101, OffsetRange(9, 100000000)), 8) - self.assertEqual( - _sequence_backlog_bytes(element, 10000, OffsetRange(0, 100000000)), - 8 * 10000 / 10) - self.assertEqual( - _sequence_backlog_bytes(element, 10000, OffsetRange(1002, 1003)), 0) - self.assertEqual( - _sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8) + @parameterized.expand([1, 2]) + def test_stop_over_by_interval_and_epsilon(self, interval): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=Timestamp(seconds=1), + stop_timestamp=Timestamp(seconds=1 + interval, micros=1), + data=[1, 2], + fire_interval=interval) + | beam.WindowInto(FixedWindows(interval)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1]), (0, [2])] + assert_that(ret, equal_to(expected)) + + def test_interval(self): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse(data=[1, 2, 3, 4], fire_interval=0.5) + | beam.WindowInto(FixedWindows(0.5)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4])] + assert_that(ret, equal_to(expected)) + + def test_repeat(self): + now = Timestamp.now() + with self.assertWarnsRegex(UserWarning, "not enough to span"): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=now, + stop_timestamp=now + 2.6, + data=[1, 2, 3, 4], + fire_interval=0.5) + | beam.WindowInto(FixedWindows(0.5)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4]), (0, [1]), (0, [2])] + assert_that(ret, equal_to(expected)) + + def test_timestamped_value(self): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + data=[(Timestamp(1), 1), (Timestamp(3), 2), (Timestamp(2), 3), + (Timestamp(1), 4)], + fire_interval=0.5) + | beam.WindowInto(FixedWindows(0.5)) + | beam.WithKeys(0) + | beam.GroupByKey()) + expected = [(0, [1, 4]), (0, [2]), (0, [3])] + assert_that(ret, equal_to(expected)) + + def test_not_enough_timestamped_value(self): + now = Timestamp.now() + data = [(Timestamp(1), 1), (Timestamp(2), 2), (Timestamp(3), 3)] + with self.assertRaisesRegex(ValueError, "not enough to span"): + with TestPipeline() as p: + _ = ( + p | PeriodicImpulse( + start_timestamp=now, + stop_timestamp=now + 2.6, + data=data, + fire_interval=0.5)) + + def test_fuzzy_interval(self): + seed = int(time.time() * 1000) + times = 30 + logging.warning("random seed=%d", seed) + random.seed(seed) + for _ in range(times): + n = int(random.randint(1, 100)) + data = list(range(n)) + m = random.randint(1, 1000) + interval = m / 1e6 + now = Timestamp.now() + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=now, data=data, fire_interval=interval)) + assert_that(ret, equal_to(data)) if __name__ == '__main__':