From 3d15fa010d8a980cb186be77ec00dad057240dda Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 16 Jun 2025 15:57:48 -0400 Subject: [PATCH 1/2] Fix an unstable output issue of periodic sequence due to precision error. --- sdks/python/apache_beam/ml/ts/util.py | 14 +++++++++++++- sdks/python/apache_beam/ml/ts/util_test.py | 11 +++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/ts/util.py b/sdks/python/apache_beam/ml/ts/util.py index 4005f57e0047..57a2b5934706 100644 --- a/sdks/python/apache_beam/ml/ts/util.py +++ b/sdks/python/apache_beam/ml/ts/util.py @@ -159,7 +159,19 @@ def __init__( self._data = data self._interval = interval self._repeat = repeat - self._duration = len(self._data) * interval + + # In `ImpulseSeqGenRestrictionProvider`, the total number of counts + # (i.e. total_outputs) is computed by ceil((end - start) / interval), + # where end is start + duration * interval. + # Due to precision error of arithmetic operations, even if duration is set + # to len(self._data), (end - start) / interval could be a little bit smaller + # or bigger than len(self._data). + # In case of being bigger, total_outputs would be len(self._data) + 1, + # as the ceil() operation is used. + # Assuming that the precision error is no bigger than 1%, by subtracting + # a small amount, we ensure that the result after ceil is stable even if + # the precision error is present. + self._duration = len(self._data) * interval - 0.01 * interval self._max_duration = max_duration if max_duration is not None else float( "inf") diff --git a/sdks/python/apache_beam/ml/ts/util_test.py b/sdks/python/apache_beam/ml/ts/util_test.py index ac2bc6ea701f..5a2a8a79ce89 100644 --- a/sdks/python/apache_beam/ml/ts/util_test.py +++ b/sdks/python/apache_beam/ml/ts/util_test.py @@ -76,6 +76,17 @@ def test_timestamped_value(self): self.assertGreaterEqual(end - start, 3) self.assertLessEqual(end - start, 7) + def test_stable_output(self): + options = PipelineOptions() + data = [(Timestamp(1), 1), (Timestamp(2), 2), (Timestamp(3), 3), + (Timestamp(6), 6), (Timestamp(4), 4), (Timestamp(5), 5), + (Timestamp(7), 7), (Timestamp(8), 8), (Timestamp(9), 9), + (Timestamp(10), 10)] + expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + with beam.Pipeline(options=options) as p: + ret = (p | PeriodicStream(data, interval=0.0001)) + assert_that(ret, equal_to(expected)) + if __name__ == '__main__': logging.getLogger().setLevel(logging.WARNING) From 760450900b4837c25df7a9dfb459dd6127f0e1d1 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 16 Jun 2025 20:42:24 -0400 Subject: [PATCH 2/2] Minor fix on the comment. --- sdks/python/apache_beam/ml/ts/util.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/ts/util.py b/sdks/python/apache_beam/ml/ts/util.py index 57a2b5934706..a50f2667621b 100644 --- a/sdks/python/apache_beam/ml/ts/util.py +++ b/sdks/python/apache_beam/ml/ts/util.py @@ -162,10 +162,10 @@ def __init__( # In `ImpulseSeqGenRestrictionProvider`, the total number of counts # (i.e. total_outputs) is computed by ceil((end - start) / interval), - # where end is start + duration * interval. + # where end is start + duration. # Due to precision error of arithmetic operations, even if duration is set - # to len(self._data), (end - start) / interval could be a little bit smaller - # or bigger than len(self._data). + # to len(self._data) * interval, (end - start) / interval could be a little + # bit smaller or bigger than len(self._data). # In case of being bigger, total_outputs would be len(self._data) + 1, # as the ceil() operation is used. # Assuming that the precision error is no bigger than 1%, by subtracting