diff --git a/.github/trigger_files/beam_PostCommit_XVR_Direct.json b/.github/trigger_files/beam_PostCommit_XVR_Direct.json index 236b7bee8af8..262d546418db 100644 --- a/.github/trigger_files/beam_PostCommit_XVR_Direct.json +++ b/.github/trigger_files/beam_PostCommit_XVR_Direct.json @@ -1,3 +1,4 @@ { - "https://github.com/apache/beam/pull/32648": "testing Flink 1.19 support" + "https://github.com/apache/beam/pull/32648": "testing Flink 1.19 support", + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJavaSQL_Dataflow.json b/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJavaSQL_Dataflow.json index 9e26dfeeb6e6..bb31ea07c195 100644 --- a/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJavaSQL_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_XVR_PythonUsingJavaSQL_Dataflow.json @@ -1 +1,3 @@ -{} \ No newline at end of file +{ + "modification": 1 +} \ No newline at end of file diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 36f0b7a4ab25..49cbbdd17e69 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -1002,7 +1002,7 @@ def decode(self, encoded): def estimate_size(self, value, nested=False): # type: (Any, bool) -> int # Note that VarInts are encoded the same way regardless of nesting. - return get_varint_size(value & 0xFFFFFFFF) + return get_varint_size(int(value) & 0xFFFFFFFF) class SingletonCoderImpl(CoderImpl): diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd index de5377cc94f0..24ce637f0420 100644 --- a/sdks/python/apache_beam/coders/stream.pxd +++ b/sdks/python/apache_beam/coders/stream.pxd @@ -26,7 +26,7 @@ cdef class OutputStream(object): cpdef write(self, bytes b, bint nested=*) cpdef write_byte(self, unsigned char val) cpdef write_var_int64(self, libc.stdint.int64_t v) - cpdef write_var_int32(self, libc.stdint.int32_t v) + cpdef write_var_int32(self, libc.stdint.int64_t v) cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v) cpdef write_bigendian_uint64(self, libc.stdint.uint64_t signed_v) cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v) @@ -45,7 +45,7 @@ cdef class ByteCountingOutputStream(OutputStream): cpdef write(self, bytes b, bint nested=*) cpdef write_var_int64(self, libc.stdint.int64_t val) - cpdef write_var_int32(self, libc.stdint.int32_t val) + cpdef write_var_int32(self, libc.stdint.int64_t val) cpdef write_byte(self, unsigned char val) cpdef write_bigendian_int64(self, libc.stdint.int64_t val) cpdef write_bigendian_uint64(self, libc.stdint.uint64_t val) diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx index d9de4afbec79..dbc671d7f1a8 100644 --- a/sdks/python/apache_beam/coders/stream.pyx +++ b/sdks/python/apache_beam/coders/stream.pyx @@ -73,9 +73,10 @@ cdef class OutputStream(object): if not v: break - cpdef write_var_int32(self, libc.stdint.int32_t signed_v): + cpdef write_var_int32(self, libc.stdint.int64_t signed_v): """Encode an int using variable-length encoding to a stream.""" - cdef libc.stdint.int64_t v = signed_v & (0xFFFFFFFF) + # for backward compatibility, input type is int64_t thus tolerates overflow + cdef libc.stdint.int64_t v = signed_v & 0xFFFFFFFF self.write_var_int64(v) cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v): @@ -156,7 +157,7 @@ cdef class ByteCountingOutputStream(OutputStream): cpdef write_var_int64(self, libc.stdint.int64_t signed_v): self.count += get_varint_size(signed_v) - cpdef write_var_int32(self, libc.stdint.int32_t signed_v): + cpdef write_var_int32(self, libc.stdint.int64_t signed_v): if signed_v < 0: self.count += 5 else: