From 848e55db317d5a0a05d3f61bac83afb2b9b03934 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sun, 9 Mar 2025 16:15:14 -0400 Subject: [PATCH 1/2] Added BigEndianInt32Coder --- sdks/python/apache_beam/coders/coder_impl.pxd | 4 ++++ sdks/python/apache_beam/coders/coder_impl.py | 16 +++++++++++++ sdks/python/apache_beam/coders/coders.py | 23 ++++++++++++++++++- 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 8a28499555c1..bf8d123cd368 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -112,6 +112,10 @@ cdef class BigEndianShortCoderImpl(StreamCoderImpl): pass +cdef class BigEndianInt32CoderImpl(StreamCoderImpl): + pass + + cdef class SinglePrecisionFloatCoderImpl(StreamCoderImpl): pass diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 5dff35052901..d996e69a13b9 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -758,6 +758,22 @@ def estimate_size(self, unused_value, nested=False): if unused_value is not None else 0) +class BigEndianInt32CoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" + def encode_to_stream(self, value, out, nested): + # type: (int, create_OutputStream, bool) -> None + out.write_bigendian_int32(value) + + def decode_from_stream(self, in_stream, nested): + # type: (create_InputStream, bool) -> int + return in_stream.read_bigendian_int32() + + def estimate_size(self, unused_value, nested=False): + # type: (Any, bool) -> int + # An int32 is encoded as 4 bytes, regardless of nesting. + return 4 + + class BigEndianShortCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" def encode_to_stream(self, value, out, nested): diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index e6250532aef1..7f273017224e 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -113,7 +113,9 @@ 'WindowedValueCoder', 'ParamWindowedValueCoder', 'BigIntegerCoder', - 'DecimalCoder' + 'DecimalCoder', + 'BigEndianInt32Coder', + 'BigEndianShortCoder' ] T = TypeVar('T') @@ -669,6 +671,25 @@ def __hash__(self): return hash(type(self)) +class BigEndianInt32Coder(FastCoder): + """A coder used for big-endian int32 values.""" + def _create_impl(self): + return coder_impl.BigEndianInt32CoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + def to_type_hint(self): + return int + + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + + class SinglePrecisionFloatCoder(FastCoder): """A coder used for single-precision floating-point values.""" def _create_impl(self): From 11955789b495c707b2555fe0dcdc02dd85f8e9ac Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 9 Mar 2025 16:49:09 -0400 Subject: [PATCH 2/2] tests --- sdks/python/apache_beam/coders/coders_test_common.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index f3381cdb1d69..70926c6e8b69 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -164,6 +164,7 @@ def tearDownClass(cls): coders.SinglePrecisionFloatCoder, coders.ToBytesCoder, coders.BigIntegerCoder, # tested in DecimalCoder + coders.BigEndianInt32Coder, coders.TimestampPrefixingOpaqueWindowCoder, ]) cls.seen_nested -= set(