From 62b2c4476bff4df2fe9d6578d03f62f3a74fba41 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 2 May 2025 22:53:34 +0000 Subject: [PATCH 1/5] Encode paneinfo with PaneInfoCoder. --- sdks/python/apache_beam/coders/coders.py | 15 ++++++++++++++- sdks/python/apache_beam/coders/typecoders.py | 2 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index cb23e3967e33..347799c6d84b 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -113,7 +113,8 @@ 'WindowedValueCoder', 'ParamWindowedValueCoder', 'BigIntegerCoder', - 'DecimalCoder' + 'DecimalCoder', + 'PaneInfoCoder' ] T = TypeVar('T') @@ -1753,6 +1754,18 @@ def __hash__(self): return hash(type(self)) +class PaneInfoCoder(FastCoder): + def _create_impl(self): + return coder_impl.PaneInfoCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + def to_type_hint(self): + return windowed_value.PaneInfo + + class DecimalCoder(FastCoder): def _create_impl(self): return coder_impl.DecimalCoderImpl() diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 892f508d0136..19300c675596 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -73,6 +73,7 @@ def MakeXyzs(v): from apache_beam.coders import coders from apache_beam.typehints import typehints +from apache_beam.utils import windowed_value __all__ = ['registry'] @@ -92,6 +93,7 @@ def register_standard_coders(self, fallback_coder): self._register_coder_internal(bytes, coders.BytesCoder) self._register_coder_internal(bool, coders.BooleanCoder) self._register_coder_internal(str, coders.StrUtf8Coder) + self._register_coder_internal(windowed_value.PaneInfo, coders.PaneInfoCoder) self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder) self._register_coder_internal(typehints.DictConstraint, coders.MapCoder) self._register_coder_internal( From 3305c8f2c27489477e465f81074e0004ef9722fb Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 2 May 2025 23:17:48 +0000 Subject: [PATCH 2/5] Fix tests. --- .../apache_beam/coders/coders_test_common.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index bed93cbc5545..62e269194f3c 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -362,6 +362,20 @@ def test_interval_window_coder(self): coders.TupleCoder((coders.IntervalWindowCoder(), )), (window.IntervalWindow(0, 10), )) + def test_paneinfo_window_coder(self): + self.check_coder( + coders.PaneInfoCoder(), + *[ + windowed_value.PaneInfo( + is_first=False, + is_last=False, + timing=windowed_value.PaneInfoTiming.EARLY, # 0 + index=1, + nonspeculative_index=-1 + ) + for y in range(0, 10) + ]) + def test_timestamp_coder(self): self.check_coder( coders.TimestampCoder(), @@ -539,6 +553,7 @@ def test_windowed_value_coder(self): def test_param_windowed_value_coder(self): from apache_beam.transforms.window import IntervalWindow from apache_beam.utils.windowed_value import PaneInfo + # pylint: disable=too-many-function-args wv = windowed_value.create( b'', # Milliseconds to microseconds From 0082b84c9149d14c146837c5f516d8e1bc9e1a4f Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 3 May 2025 13:50:58 +0000 Subject: [PATCH 3/5] Fix import. --- sdks/python/apache_beam/coders/coders.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 347799c6d84b..c1cb58baeda9 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -64,6 +64,7 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.typehints import typehints from apache_beam.utils import proto_utils +from apache_beam.utils import windowed_value if TYPE_CHECKING: from apache_beam.coders.typecoders import CoderRegistry From fc49666d78a0b39eaa1f24d6852d3a07292bf220 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 3 May 2025 14:09:18 +0000 Subject: [PATCH 4/5] Add typecoder test. --- .../apache_beam/coders/coders_test_common.py | 12 +++++------- .../apache_beam/coders/typecoders_test.py | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 62e269194f3c..21de0e70d800 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -367,13 +367,11 @@ def test_paneinfo_window_coder(self): coders.PaneInfoCoder(), *[ windowed_value.PaneInfo( - is_first=False, - is_last=False, - timing=windowed_value.PaneInfoTiming.EARLY, # 0 - index=1, - nonspeculative_index=-1 - ) - for y in range(0, 10) + is_first=y == 0, + is_last=y == 9, + timing=windowed_value.PaneInfoTiming.EARLY, + index=y, + nonspeculative_index=-1) for y in range(0, 10) ]) def test_timestamp_coder(self): diff --git a/sdks/python/apache_beam/coders/typecoders_test.py b/sdks/python/apache_beam/coders/typecoders_test.py index 3adc8255409d..3c59cff68651 100644 --- a/sdks/python/apache_beam/coders/typecoders_test.py +++ b/sdks/python/apache_beam/coders/typecoders_test.py @@ -24,6 +24,7 @@ from apache_beam.coders import typecoders from apache_beam.internal import pickler from apache_beam.typehints import typehints +from apache_beam.utils import windowed_value class CustomClass(object): @@ -141,6 +142,24 @@ def test_nullable_coder(self): self.assertEqual(expected_coder.encode(None), real_coder.encode(None)) self.assertEqual(expected_coder.encode(b'abc'), real_coder.encode(b'abc')) + def test_paneinfo_coder(self): + expected_coder = coders.PaneInfoCoder() + real_coder = typecoders.registry.get_coder(windowed_value.PaneInfo) + self.assertEqual(expected_coder, real_coder) + for i in range(10): + pane_info = windowed_value.PaneInfo( + is_first=i==0, + is_last=i==9, + timing=windowed_value.PaneInfoTiming.EARLY, # 0 + index=i, + nonspeculative_index=-1 + ) + + encoded = real_coder.encode(pane_info) + + self.assertEqual(expected_coder.encode(pane_info), encoded) + self.assertEqual(pane_info, real_coder.decode(encoded)) + if __name__ == '__main__': unittest.main() From 6bba980a6dca49215ce43a4eb807d2927c1adedf Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 4 May 2025 18:45:25 -0400 Subject: [PATCH 5/5] Implement eq and hash. --- sdks/python/apache_beam/coders/coders.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index c1cb58baeda9..e7bf9e02b02a 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1766,6 +1766,12 @@ def is_deterministic(self): def to_type_hint(self): return windowed_value.PaneInfo + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + class DecimalCoder(FastCoder): def _create_impl(self):