From e50e56438c681fcb9e8ecfa3fef7b20a7c38ba3c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 28 Jun 2025 00:04:09 -0400 Subject: [PATCH 1/2] Fix flakiness in fuzzy test. --- .../transforms/periodicsequence.py | 59 ++++++++----------- .../transforms/periodicsequence_test.py | 41 +++++++++---- 2 files changed, 57 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index daab9d42387b..3c05e666b663 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -33,28 +33,26 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import timestamp from apache_beam.utils.timestamp import MAX_TIMESTAMP +from apache_beam.utils.timestamp import Duration from apache_beam.utils.timestamp import Timestamp class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider): def initial_restriction(self, element): start, end, interval = element - if isinstance(start, Timestamp): - start_micros = start.micros - else: - start_micros = round(start * 1000000) + if not isinstance(start, Timestamp): + start = Timestamp.of(start) - if isinstance(end, Timestamp): - end_micros = end.micros - else: - end_micros = round(end * 1000000) + if not isinstance(end, Timestamp): + end = Timestamp.of(end) - interval_micros = round(interval * 1000000) + interval_duration = Duration(interval) - assert start_micros <= end_micros + assert start <= end assert interval > 0 - delta_micros: int = end_micros - start_micros - total_outputs = math.ceil(delta_micros / interval_micros) + total_duration = end - start + total_outputs = math.ceil(total_duration.micros / interval_duration.micros) + return OffsetRange(0, total_outputs) def create_tracker(self, restriction): @@ -75,6 +73,9 @@ def _sequence_backlog_bytes(element, now, offset_range): start, _, interval = element if isinstance(start, Timestamp): start = start.micros / 1000000 + elif isinstance(start, int): + start = start / 1000000 + assert interval > 0 now_index = math.floor((now - start) / interval) @@ -157,6 +158,8 @@ def process( if isinstance(start, Timestamp): start = start.micros / 1000000 + elif isinstance(start, int): + start = start / 1000000 assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView) @@ -230,38 +233,28 @@ def _validate_and_adjust_duration(self): assert self.data # The total time we need to impulse all the data. - data_duration = (len(self.data) - 1) * self.interval + data_duration = (len(self.data) - 1) * Duration(self.interval) is_pre_timestamped = isinstance(self.data[0], tuple) and \ isinstance(self.data[0][0], timestamp.Timestamp) - if isinstance(self.start_ts, Timestamp): - start = self.start_ts.micros / 1000000 - else: - start = self.start_ts - - if isinstance(self.stop_ts, Timestamp): - if self.stop_ts == MAX_TIMESTAMP: - # When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the - # data's actual end time plus an extra fire interval, because the - # impulse duration's upper bound is exclusive. - end = start + data_duration + self.interval - self.stop_ts = Timestamp(micros=end * 1000000) - else: - end = self.stop_ts.micros / 1000000 - else: - end = self.stop_ts + if self.stop_ts == MAX_TIMESTAMP: + # When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the + # data's actual end time plus an extra fire interval, because the + # impulse duration's upper bound is exclusive. + self.stop_ts = self.start_ts + data_duration + Duration(self.interval) # The total time for the impulse signal which occurs in [start, end). - impulse_duration = end - start - if round(data_duration + self.interval, 6) < round(impulse_duration, 6): + impulse_duration = self.stop_ts - self.start_ts + #if round(data_duration + self.interval, 6) < round(impulse_duration, 6): + if data_duration + Duration(self.interval) < impulse_duration: # We don't have enough data for the impulse. # If we can fit at least one more data point in the impulse duration, # then we will be in the repeat mode. message = 'The number of elements in the provided pre-timestamped ' \ 'data sequence is not enough to span the full impulse duration. ' \ - f'Expected duration: {impulse_duration:.6f}, ' \ - f'actual data duration: {data_duration:.6f}.' + f'Expected duration: {impulse_duration}, ' \ + f'actual data duration: {data_duration}.' if is_pre_timestamped: raise ValueError( diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index fdf0995f8e5a..e17cc6349f1a 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -261,23 +261,44 @@ def test_not_enough_timestamped_value(self): data=data, fire_interval=0.5)) - def test_fuzzy_interval(self): - seed = int(time.time() * 1000) + def test_fuzzy_length_and_interval(self): times = 30 - logging.warning("random seed=%d", seed) - random.seed(seed) for _ in range(times): + seed = int(time.time() * 1000) + random.seed(seed) n = int(random.randint(1, 100)) data = list(range(n)) m = random.randint(1, 1000) interval = m / 1e6 now = Timestamp.now() - with TestPipeline() as p: - ret = ( - p | PeriodicImpulse( - start_timestamp=now, data=data, fire_interval=interval)) - assert_that(ret, equal_to(data)) - + try: + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=now, data=data, fire_interval=interval)) + assert_that(ret, equal_to(data)) + except Exception as e: # pylint: disable=broad-except + logging.error("Error occurred at random seed=%d", seed) + raise e + + def test_fuzzy_length_at_minimal_interval(self): + times = 30 + for _ in range(times): + seed = int(time.time() * 1000) + random.seed(seed) + n = int(random.randint(1, 100)) + data = list(range(n)) + interval = 1e-6 + now = Timestamp.now() + try: + with TestPipeline() as p: + ret = ( + p | PeriodicImpulse( + start_timestamp=now, data=data, fire_interval=interval)) + assert_that(ret, equal_to(data)) + except Exception as e: # pylint: disable=broad-except + logging.error("Error occurred at random seed=%d", seed) + raise e if __name__ == '__main__': unittest.main() From eb9c49c8d0f6447e453a5296f4def7a705ecc18a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 28 Jun 2025 00:25:03 -0400 Subject: [PATCH 2/2] Apply yapf. --- sdks/python/apache_beam/transforms/periodicsequence_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index e17cc6349f1a..62f9925fd2b0 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -300,5 +300,6 @@ def test_fuzzy_length_at_minimal_interval(self): logging.error("Error occurred at random seed=%d", seed) raise e + if __name__ == '__main__': unittest.main()