diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 8a28499555c1..bf8d123cd368 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -112,6 +112,10 @@ cdef class BigEndianShortCoderImpl(StreamCoderImpl): pass +cdef class BigEndianInt32CoderImpl(StreamCoderImpl): + pass + + cdef class SinglePrecisionFloatCoderImpl(StreamCoderImpl): pass diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 5dff35052901..d996e69a13b9 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -758,6 +758,22 @@ def estimate_size(self, unused_value, nested=False): if unused_value is not None else 0) +class BigEndianInt32CoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" + def encode_to_stream(self, value, out, nested): + # type: (int, create_OutputStream, bool) -> None + out.write_bigendian_int32(value) + + def decode_from_stream(self, in_stream, nested): + # type: (create_InputStream, bool) -> int + return in_stream.read_bigendian_int32() + + def estimate_size(self, unused_value, nested=False): + # type: (Any, bool) -> int + # An int32 is encoded as 4 bytes, regardless of nesting. + return 4 + + class BigEndianShortCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" def encode_to_stream(self, value, out, nested): diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index e6250532aef1..7f273017224e 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -113,7 +113,9 @@ 'WindowedValueCoder', 'ParamWindowedValueCoder', 'BigIntegerCoder', - 'DecimalCoder' + 'DecimalCoder', + 'BigEndianInt32Coder', + 'BigEndianShortCoder' ] T = TypeVar('T') @@ -669,6 +671,25 @@ def __hash__(self): return hash(type(self)) +class BigEndianInt32Coder(FastCoder): + """A coder used for big-endian int32 values.""" + def _create_impl(self): + return coder_impl.BigEndianInt32CoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + def to_type_hint(self): + return int + + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + + class SinglePrecisionFloatCoder(FastCoder): """A coder used for single-precision floating-point values.""" def _create_impl(self): diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index f3381cdb1d69..70926c6e8b69 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -164,6 +164,7 @@ def tearDownClass(cls): coders.SinglePrecisionFloatCoder, coders.ToBytesCoder, coders.BigIntegerCoder, # tested in DecimalCoder + coders.BigEndianInt32Coder, coders.TimestampPrefixingOpaqueWindowCoder, ]) cls.seen_nested -= set(