From 0bbf3e8cd6bc613adff84eff3acf14907a01ba64 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 28 Jun 2025 15:22:53 -0400 Subject: [PATCH 1/4] Support TimestampTypes in PeriodicImpulse. Fix flakiness in fuzzy tests. --- .../transforms/periodicsequence.py | 62 ++++++++----------- .../transforms/periodicsequence_test.py | 41 +++++++++--- 2 files changed, 59 insertions(+), 44 deletions(-) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index daab9d42387b..d9d25247112b 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -32,29 +32,28 @@ from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import timestamp +from apache_beam.utils.timestamp import Duration from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import Timestamp +from apache_beam.utils.timestamp import TimestampTypes class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider): def initial_restriction(self, element): start, end, interval = element - if isinstance(start, Timestamp): - start_micros = start.micros - else: - start_micros = round(start * 1000000) + if not isinstance(start, Timestamp): + start = Timestamp.of(start) - if isinstance(end, Timestamp): - end_micros = end.micros - else: - end_micros = round(end * 1000000) + if not isinstance(end, Timestamp): + end = Timestamp.of(end) - interval_micros = round(interval * 1000000) + interval_duration = Duration(interval) - assert start_micros <= end_micros + assert start <= end assert interval > 0 - delta_micros: int = end_micros - start_micros - total_outputs = math.ceil(delta_micros / interval_micros) + total_duration = end - start + total_outputs = math.ceil(total_duration.micros / interval_duration.micros) + return OffsetRange(0, total_outputs) def create_tracker(self, restriction): @@ -230,38 +229,31 @@ def _validate_and_adjust_duration(self): assert self.data # The total time we need to impulse all the data. - data_duration = (len(self.data) - 1) * self.interval + data_duration = (len(self.data) - 1) * Duration(self.interval) is_pre_timestamped = isinstance(self.data[0], tuple) and \ isinstance(self.data[0][0], timestamp.Timestamp) - if isinstance(self.start_ts, Timestamp): - start = self.start_ts.micros / 1000000 - else: - start = self.start_ts - - if isinstance(self.stop_ts, Timestamp): - if self.stop_ts == MAX_TIMESTAMP: - # When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the - # data's actual end time plus an extra fire interval, because the - # impulse duration's upper bound is exclusive. - end = start + data_duration + self.interval - self.stop_ts = Timestamp(micros=end * 1000000) - else: - end = self.stop_ts.micros / 1000000 - else: - end = self.stop_ts + start_ts = Timestamp.of(self.start_ts) + stop_ts = Timestamp.of(self.stop_ts) + + if stop_ts == MAX_TIMESTAMP: + # When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the + # data's actual end time plus an extra fire interval, because the + # impulse duration's upper bound is exclusive. + self.stop_ts = start_ts + data_duration + Duration(self.interval) + stop_ts = self.stop_ts # The total time for the impulse signal which occurs in [start, end). - impulse_duration = end - start - if round(data_duration + self.interval, 6) < round(impulse_duration, 6): + impulse_duration = stop_ts - start_ts + if data_duration + Duration(self.interval) < impulse_duration: # We don't have enough data for the impulse. # If we can fit at least one more data point in the impulse duration, # then we will be in the repeat mode. message = 'The number of elements in the provided pre-timestamped ' \ 'data sequence is not enough to span the full impulse duration. ' \ - f'Expected duration: {impulse_duration:.6f}, ' \ - f'actual data duration: {data_duration:.6f}.' + f'Expected duration: {impulse_duration}, ' \ + f'actual data duration: {data_duration}.' if is_pre_timestamped: raise ValueError( @@ -274,8 +266,8 @@ def _validate_and_adjust_duration(self): def __init__( self, - start_timestamp: Timestamp = Timestamp.now(), - stop_timestamp: Timestamp = MAX_TIMESTAMP, + start_timestamp: TimestampTypes = Timestamp.now(), + stop_timestamp: TimestampTypes = MAX_TIMESTAMP, fire_interval: float = 360.0, apply_windowing: bool = False, data: Optional[Sequence[Any]] = None): diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index fdf0995f8e5a..c1f4fa954320 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -261,22 +261,45 @@ def test_not_enough_timestamped_value(self): data=data, fire_interval=0.5)) - def test_fuzzy_interval(self): - seed = int(time.time() * 1000) + def test_fuzzy_length_and_interval(self): times = 30 - logging.warning("random seed=%d", seed) - random.seed(seed) for _ in range(times): + seed = int(time.time() * 1000) + random.seed(seed) n = int(random.randint(1, 100)) data = list(range(n)) m = random.randint(1, 1000) interval = m / 1e6 now = Timestamp.now() - with TestPipeline() as p: - ret = ( - p | PeriodicImpulse( - start_timestamp=now, data=data, fire_interval=interval)) - assert_that(ret, equal_to(data)) + try: + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=now, data=data, fire_interval=interval)) + assert_that(ret, equal_to(data)) + except Exception as e: # pylint: disable=broad-except + logging.error("Error occurred at random seed=%d", seed) + raise e + + def test_fuzzy_length_at_minimal_interval(self): + times = 30 + for _ in range(times): + seed = int(time.time() * 1000) + seed = 1751135957975 + random.seed(seed) + n = int(random.randint(1, 100)) + data = list(range(n)) + interval = 1e-6 + now = Timestamp.now() + try: + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=now, data=data, fire_interval=interval)) + assert_that(ret, equal_to(data)) + except Exception as e: # pylint: disable=broad-except + logging.error("Error occurred at random seed=%d", seed) + raise e if __name__ == '__main__': From be757f7ffe20f867d314e5b8d7fa120db4539abc Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 28 Jun 2025 15:59:54 -0400 Subject: [PATCH 2/4] Fix lints. --- sdks/python/apache_beam/transforms/periodicsequence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index d9d25247112b..791d7ab31ae4 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -32,8 +32,8 @@ from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import timestamp -from apache_beam.utils.timestamp import Duration from apache_beam.utils.timestamp import MAX_TIMESTAMP +from apache_beam.utils.timestamp import Duration from apache_beam.utils.timestamp import Timestamp from apache_beam.utils.timestamp import TimestampTypes From 90b885f6c4375666703400a08b668f668df728b8 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 28 Jun 2025 23:17:55 -0400 Subject: [PATCH 3/4] Add tests for different timestamp input types. --- .../transforms/periodicsequence_test.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index c1f4fa954320..fce2061614af 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -301,6 +301,39 @@ def test_fuzzy_length_at_minimal_interval(self): logging.error("Error occurred at random seed=%d", seed) raise e + def test_int_type_input(self): + # This test is to verify that if input timestamps and interval are integers, + # the generated timestamped values are also integers. + # This is necessary for the following test to pass: + # apache_beam.examples.snippets.snippets_test.SlowlyChangingSideInputsTest + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=1, stop_timestamp=5, fire_interval=1)) + expected = [1, 2, 3, 4] + assert_that( + ret, equal_to(expected, lambda x, y: type(x) is type(y) and x == y)) + + def test_float_type_input(self): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=1.0, stop_timestamp=5.0, fire_interval=1)) + expected = [1.0, 2.0, 3.0, 4.0] + assert_that( + ret, equal_to(expected, lambda x, y: type(x) is type(y) and x == y)) + + def test_timestamp_type_input(self): + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=Timestamp.of(1), + stop_timestamp=Timestamp.of(5), + fire_interval=1)) + expected = [1.0, 2.0, 3.0, 4.0] + assert_that( + ret, equal_to(expected, lambda x, y: type(x) is type(y) and x == y)) + if __name__ == '__main__': unittest.main() From 90b9f9c8fa669061a55ad0ca4726fd68acbbadc4 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 28 Jun 2025 23:42:45 -0400 Subject: [PATCH 4/4] Use an identity function in map after ImpulseSeqGenDoFn --- sdks/python/apache_beam/transforms/periodicsequence.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index 791d7ab31ae4..8916de0fa58a 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -319,11 +319,11 @@ def expand(self, pbegin): | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn(self.data))) if not self.data: - # This step is only to ensure the current PTransform expansion is - # compatible with the previous Beam versions. - result = ( - result - | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt))) + # This step is actually an identity transform, because the Timestamped + # values have already been generated in `ImpulseSeqGenDoFn`. + # We keep this step here to prevent the current PeriodicImpulse from + # breaking the compatibility. + result = (result | 'MapToTimestamped' >> beam.Map(lambda tt: tt)) if self.apply_windowing: result = result | 'ApplyWindowing' >> beam.WindowInto(