From 71ddef77efdce7fa4331a13c96d3e2c835f974f8 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 19 Mar 2025 23:10:39 -0400 Subject: [PATCH 1/3] Create a varint32 coder and used it for RowCoder --- ...am_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- sdks/python/apache_beam/coders/coder_impl.pxd | 5 +++ sdks/python/apache_beam/coders/coder_impl.py | 31 +++++++++++++++++++ sdks/python/apache_beam/coders/coders.py | 21 ++++++++++++- .../apache_beam/coders/coders_test_common.py | 14 +++++++++ sdks/python/apache_beam/coders/row_coder.py | 5 ++- .../apache_beam/coders/row_coder_test.py | 23 ++++++++++++++ sdks/python/apache_beam/coders/slow_stream.py | 7 +++++ sdks/python/apache_beam/coders/stream.pxd | 4 +++ sdks/python/apache_beam/coders/stream.pyx | 18 ++++++++++- sdks/python/apache_beam/coders/stream_test.py | 21 +++++++++++++ .../io/external/xlang_jdbcio_it_test.py | 16 +++++----- 12 files changed, 156 insertions(+), 11 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index f1ba03a243ee..e0266d62f2e0 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 5 + "modification": 4 } diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 8a28499555c1..27cffe7b62df 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -130,6 +130,11 @@ cdef class VarIntCoderImpl(StreamCoderImpl): cpdef bytes encode(self, value) +cdef class VarInt32CoderImpl(StreamCoderImpl): + @cython.locals(ivalue=libc.stdint.int32_t) + cpdef bytes encode(self, value) + + cdef class SingletonCoderImpl(CoderImpl): cdef object _value diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 5dff35052901..36f0b7a4ab25 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -974,6 +974,37 @@ def estimate_size(self, value, nested=False): return get_varint_size(value) +class VarInt32CoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees. + + A coder for int32 objects.""" + def encode_to_stream(self, value, out, nested): + # type: (int, create_OutputStream, bool) -> None + out.write_var_int32(value) + + def decode_from_stream(self, in_stream, nested): + # type: (create_InputStream, bool) -> int + return in_stream.read_var_int32() + + def encode(self, value): + ivalue = value # type cast + if 0 <= ivalue < len(small_ints): + return small_ints[ivalue] + return StreamCoderImpl.encode(self, value) + + def decode(self, encoded): + if len(encoded) == 1: + i = ord(encoded) + if 0 <= i < 128: + return i + return StreamCoderImpl.decode(self, encoded) + + def estimate_size(self, value, nested=False): + # type: (Any, bool) -> int + # Note that VarInts are encoded the same way regardless of nesting. + return get_varint_size(value & 0xFFFFFFFF) + + class SingletonCoderImpl(CoderImpl): """For internal use only; no backwards-compatibility guarantees. diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index e6250532aef1..cb23e3967e33 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -629,7 +629,7 @@ def __repr__(self): class VarIntCoder(FastCoder): - """Variable-length integer coder.""" + """Variable-length integer coder matches Java SDK's VarLongCoder.""" def _create_impl(self): return coder_impl.VarIntCoderImpl() @@ -650,6 +650,25 @@ def __hash__(self): Coder.register_structured_urn(common_urns.coders.VARINT.urn, VarIntCoder) +class VarInt32Coder(FastCoder): + """Variable-length integer coder matches Java SDK's VarIntCoder.""" + def _create_impl(self): + return coder_impl.VarInt32CoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + def to_type_hint(self): + return int + + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + + class BigEndianShortCoder(FastCoder): """A coder used for big-endian int16 values.""" def _create_impl(self): diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index f3381cdb1d69..bed93cbc5545 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -318,6 +318,20 @@ def test_varint_coder(self): for k in range(0, int(math.log(MAX_64_BIT_INT))) ]) + def test_varint32_coder(self): + # Small ints. + self.check_coder(coders.VarInt32Coder(), *range(-10, 10)) + # Multi-byte encoding starts at 128 + self.check_coder(coders.VarInt32Coder(), *range(120, 140)) + # Large values + MAX_32_BIT_INT = 0x7fffffff + self.check_coder( + coders.VarIntCoder(), + *[ + int(math.pow(-1, k) * math.exp(k)) + for k in range(0, int(math.log(MAX_32_BIT_INT))) + ]) + def test_float_coder(self): self.check_coder( coders.FloatCoder(), *[float(0.1 * x) for x in range(-100, 100)]) diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py index e93abbc887fb..dd4f85ae081c 100644 --- a/sdks/python/apache_beam/coders/row_coder.py +++ b/sdks/python/apache_beam/coders/row_coder.py @@ -34,6 +34,7 @@ from apache_beam.coders.coders import StrUtf8Coder from apache_beam.coders.coders import TimestampCoder from apache_beam.coders.coders import VarIntCoder +from apache_beam.coders.coders import VarInt32Coder from apache_beam.portability import common_urns from apache_beam.portability.api import schema_pb2 from apache_beam.typehints import row_type @@ -142,8 +143,10 @@ def _coder_from_type(field_type): def _nonnull_coder_from_type(field_type): type_info = field_type.WhichOneof("type_info") if type_info == "atomic_type": - if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64): + if field_type.atomic_type == schema_pb2.INT64: return VarIntCoder() + elif field_type.atomic_type == schema_pb2.INT32: + return VarInt32Coder() if field_type.atomic_type == schema_pb2.INT16: return BigEndianShortCoder() elif field_type.atomic_type == schema_pb2.FLOAT: diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py index 6ac982835cb3..4d47bca3e2b2 100644 --- a/sdks/python/apache_beam/coders/row_coder_test.py +++ b/sdks/python/apache_beam/coders/row_coder_test.py @@ -203,6 +203,29 @@ def test_create_row_coder_from_schema(self): for test_case in self.PEOPLE: self.assertEqual(test_case, coder.decode(coder.encode(test_case))) + def test_row_coder_negative_varint(self): + schema = schema_pb2.Schema( + id="negative", + fields=[ + schema_pb2.Field( + name="i64", + type=schema_pb2.FieldType(atomic_type=schema_pb2.INT64)), + schema_pb2.Field( + name="i32", + type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32)) + ]) + coder = RowCoder(schema) + Negative = typing.NamedTuple( + "Negative", [ + ("i64", np.int64), + ("i32", np.int32), + ]) + test_cases = [ + Negative(-1, -1023), Negative(-1023, -1), Negative(-2**63, -2**31) + ] + for test_case in test_cases: + self.assertEqual(test_case, coder.decode(coder.encode(test_case))) + @unittest.skip( "https://github.com/apache/beam/issues/19696 - Overflow behavior in " "VarIntCoder is currently inconsistent") diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index b08ad8e9a37f..7618b33ba497 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -58,6 +58,9 @@ def write_var_int64(self, v: int) -> None: if not v: break + def write_var_int32(self, v: int) -> None: + self.write_var_int64(v & 0xFFFFFFFF) + def write_bigendian_int64(self, v): self.write(struct.pack('>q', v)) @@ -156,6 +159,10 @@ def read_var_int64(self): result -= 1 << 64 return result + def read_var_int32(self): + v = self.read_var_int64() + return struct.unpack('q', self.read(8))[0] diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd index 97d66aa089a4..de5377cc94f0 100644 --- a/sdks/python/apache_beam/coders/stream.pxd +++ b/sdks/python/apache_beam/coders/stream.pxd @@ -26,6 +26,7 @@ cdef class OutputStream(object): cpdef write(self, bytes b, bint nested=*) cpdef write_byte(self, unsigned char val) cpdef write_var_int64(self, libc.stdint.int64_t v) + cpdef write_var_int32(self, libc.stdint.int32_t v) cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v) cpdef write_bigendian_uint64(self, libc.stdint.uint64_t signed_v) cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v) @@ -43,6 +44,8 @@ cdef class ByteCountingOutputStream(OutputStream): cdef size_t count cpdef write(self, bytes b, bint nested=*) + cpdef write_var_int64(self, libc.stdint.int64_t val) + cpdef write_var_int32(self, libc.stdint.int32_t val) cpdef write_byte(self, unsigned char val) cpdef write_bigendian_int64(self, libc.stdint.int64_t val) cpdef write_bigendian_uint64(self, libc.stdint.uint64_t val) @@ -61,6 +64,7 @@ cdef class InputStream(object): cpdef bytes read(self, size_t len) cpdef long read_byte(self) except? -1 cpdef libc.stdint.int64_t read_var_int64(self) except? -1 + cpdef libc.stdint.int32_t read_var_int32(self) except? -1 cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1 cpdef libc.stdint.uint64_t read_bigendian_uint64(self) except? -1 cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1 diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx index 3977660f68b0..d9de4afbec79 100644 --- a/sdks/python/apache_beam/coders/stream.pyx +++ b/sdks/python/apache_beam/coders/stream.pyx @@ -73,6 +73,11 @@ cdef class OutputStream(object): if not v: break + cpdef write_var_int32(self, libc.stdint.int32_t signed_v): + """Encode an int using variable-length encoding to a stream.""" + cdef libc.stdint.int64_t v = signed_v & (0xFFFFFFFF) + self.write_var_int64(v) + cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v): self.write_bigendian_uint64(signed_v) @@ -91,7 +96,7 @@ cdef class OutputStream(object): cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v): cdef libc.stdint.uint32_t v = signed_v - if self.buffer_size < self.pos + 4: + if self.buffer_size < self.pos + 4: self.extend(4) self.data[self.pos ] = (v >> 24) self.data[self.pos + 1] = (v >> 16) @@ -151,6 +156,12 @@ cdef class ByteCountingOutputStream(OutputStream): cpdef write_var_int64(self, libc.stdint.int64_t signed_v): self.count += get_varint_size(signed_v) + cpdef write_var_int32(self, libc.stdint.int32_t signed_v): + if signed_v < 0: + self.count += 5 + else: + self.count += get_varint_size(signed_v) + cpdef write_byte(self, unsigned char _): self.count += 1 @@ -225,6 +236,11 @@ cdef class InputStream(object): return result + cpdef libc.stdint.int32_t read_var_int32(self) except? -1: + """Decode a variable-length encoded int32 from a stream.""" + cdef libc.stdint.int64_t v = self.read_var_int64() + return (v); + cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1: return self.read_bigendian_uint64() diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py index 57662056b2a0..1e8b2ac11627 100644 --- a/sdks/python/apache_beam/coders/stream_test.py +++ b/sdks/python/apache_beam/coders/stream_test.py @@ -92,6 +92,27 @@ def test_medium_var_int64(self): def test_large_var_int64(self): self.run_read_write_var_int64([0, 2**63 - 1, -2**63, 2**63 - 3]) + def run_read_write_var_int32(self, values): + out_s = self.OutputStream() + for v in values: + out_s.write_var_int32(v) + in_s = self.InputStream(out_s.get()) + for v in values: + self.assertEqual(v, in_s.read_var_int32()) + + def test_small_var_int32(self): + self.run_read_write_var_int32(range(-10, 30)) + + def test_medium_var_int32(self): + base = -1.7 + self.run_read_write_var_int32([ + int(base**pow) + for pow in range(1, int(31 * math.log(2) / math.log(-base))) + ]) + + def test_large_var_int32(self): + self.run_read_write_var_int32([0, 2**31 - 1, -2**31, 2**31 - 3]) + def test_read_write_double(self): values = 0, 1, -1, 1e100, 1.0 / 3, math.pi, float('inf') out_s = self.OutputStream() diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index 01d868950c03..a08a5704aa26 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -61,10 +61,10 @@ JdbcTestRow = typing.NamedTuple( "JdbcTestRow", - [("f_id", int), ("f_float", float), ("f_char", str), ("f_varchar", str), - ("f_bytes", bytes), ("f_varbytes", bytes), ("f_timestamp", Timestamp), - ("f_decimal", Decimal), ("f_date", datetime.date), - ("f_time", datetime.time)], + [("f_id", int), ("f_id_long", int), ("f_float", float), ("f_char", str), + ("f_varchar", str), ("f_bytes", bytes), ("f_varbytes", bytes), + ("f_timestamp", Timestamp), ("f_decimal", Decimal), + ("f_date", datetime.date), ("f_time", datetime.time)], ) coders.registry.register_coder(JdbcTestRow, coders.RowCoder) @@ -184,7 +184,7 @@ def create_test_table(self, connection, table_name, database): connection.execute( sqlalchemy.text( f"CREATE TABLE IF NOT EXISTS {table_name}" + - "(f_id INTEGER, f_float DOUBLE PRECISION, " + + "(f_id INTEGER, f_id_long BIGINT, f_float DOUBLE PRECISION, " + "f_char CHAR(10), f_varchar VARCHAR(10), " + f"f_bytes {binary_type[0]}, f_varbytes {binary_type[1]}, " + "f_timestamp TIMESTAMP(3), f_decimal DECIMAL(10, 2), " + @@ -193,7 +193,8 @@ def create_test_table(self, connection, table_name, database): def generate_test_data(self, count): return [ JdbcTestRow( - i, + i - 3, + i - 3, i + 0.1, f'Test{i}', f'Test{i}', @@ -225,6 +226,7 @@ def test_xlang_jdbc_write_read(self, database): expected_rows.append( JdbcTestRow( + row.f_id, row.f_id, row.f_float, f_char, @@ -390,7 +392,7 @@ def test_xlang_jdbc_custom_statements(self, database): SimpleRow(2, "Item2", 20.75), SimpleRow(3, "Item3", 30.25), SimpleRow(4, "Item4", 40.0), - SimpleRow(5, "Item5", 50.5) + SimpleRow(-5, "Item5", 50.5) ] config = self.jdbc_configs[database] From 7cb155ddbb5774b5bd7c1939b75e5d2195773333 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 20 Mar 2025 11:02:27 -0400 Subject: [PATCH 2/3] Fix checks --- sdks/python/apache_beam/coders/row_coder.py | 2 +- sdks/python/apache_beam/coders/slow_stream.py | 2 +- sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py index dd4f85ae081c..dc473b1d6d7d 100644 --- a/sdks/python/apache_beam/coders/row_coder.py +++ b/sdks/python/apache_beam/coders/row_coder.py @@ -33,8 +33,8 @@ from apache_beam.coders.coders import SinglePrecisionFloatCoder from apache_beam.coders.coders import StrUtf8Coder from apache_beam.coders.coders import TimestampCoder -from apache_beam.coders.coders import VarIntCoder from apache_beam.coders.coders import VarInt32Coder +from apache_beam.coders.coders import VarIntCoder from apache_beam.portability import common_urns from apache_beam.portability.api import schema_pb2 from apache_beam.typehints import row_type diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index 7618b33ba497..fb4aa50f233d 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -59,7 +59,7 @@ def write_var_int64(self, v: int) -> None: break def write_var_int32(self, v: int) -> None: - self.write_var_int64(v & 0xFFFFFFFF) + self.write_var_int64(int(v) & 0xFFFFFFFF) def write_bigendian_int64(self, v): self.write(struct.pack('>q', v)) diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index a08a5704aa26..ca6fa3d711ca 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -72,6 +72,7 @@ "CustomSchemaRow", [ ("renamed_id", int), + ("renamed_id_long", int), ("renamed_float", float), ("renamed_char", str), ("renamed_varchar", str), @@ -312,6 +313,7 @@ def test_xlang_jdbc_read_with_explicit_schema(self, database): expected_rows.append( CustomSchemaRow( + row.f_id, row.f_id, row.f_float, f_char, @@ -326,6 +328,7 @@ def test_xlang_jdbc_read_with_explicit_schema(self, database): def custom_row_equals(expected, actual): return ( expected.renamed_id == actual.renamed_id and + expected.renamed_id_long == actual.renamed_id_long and expected.renamed_float == actual.renamed_float and expected.renamed_char.rstrip() == actual.renamed_char.rstrip() and expected.renamed_varchar == actual.renamed_varchar and From 63a19827fe173a25ae6b47d47b393b4ab6c7d32d Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 20 Mar 2025 12:04:51 -0400 Subject: [PATCH 3/3] Handle a race when multiple process downloading jar at the same time --- sdks/python/apache_beam/utils/subprocess_server.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index efb27715cd82..85d9286bddd0 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -423,7 +423,12 @@ def local_jar(cls, url, cache_dir=None): url_read = urlopen(url) with open(cached_jar + '.tmp', 'wb') as jar_write: shutil.copyfileobj(url_read, jar_write, length=1 << 20) - os.rename(cached_jar + '.tmp', cached_jar) + try: + os.rename(cached_jar + '.tmp', cached_jar) + except FileNotFoundError: + # A race when multiple programs run in parallel and the cached_jar + # is already moved. Safe to ignore. + pass except URLError as e: raise RuntimeError( f'Unable to fetch remote job server jar at {url}: {e}. If no '