Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 5
"modification": 4
}
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/coders/coder_impl.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ cdef class VarIntCoderImpl(StreamCoderImpl):
cpdef bytes encode(self, value)


cdef class VarInt32CoderImpl(StreamCoderImpl):
@cython.locals(ivalue=libc.stdint.int32_t)
cpdef bytes encode(self, value)


cdef class SingletonCoderImpl(CoderImpl):
cdef object _value

Expand Down
31 changes: 31 additions & 0 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,37 @@ def estimate_size(self, value, nested=False):
return get_varint_size(value)


class VarInt32CoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

A coder for int32 objects."""
def encode_to_stream(self, value, out, nested):
# type: (int, create_OutputStream, bool) -> None
out.write_var_int32(value)

def decode_from_stream(self, in_stream, nested):
# type: (create_InputStream, bool) -> int
return in_stream.read_var_int32()

def encode(self, value):
ivalue = value # type cast
if 0 <= ivalue < len(small_ints):
return small_ints[ivalue]
return StreamCoderImpl.encode(self, value)

def decode(self, encoded):
if len(encoded) == 1:
i = ord(encoded)
if 0 <= i < 128:
return i
return StreamCoderImpl.decode(self, encoded)

def estimate_size(self, value, nested=False):
# type: (Any, bool) -> int
# Note that VarInts are encoded the same way regardless of nesting.
return get_varint_size(value & 0xFFFFFFFF)


class SingletonCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

Expand Down
21 changes: 20 additions & 1 deletion sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ def __repr__(self):


class VarIntCoder(FastCoder):
"""Variable-length integer coder."""
"""Variable-length integer coder matches Java SDK's VarLongCoder."""
def _create_impl(self):
return coder_impl.VarIntCoderImpl()

Expand All @@ -650,6 +650,25 @@ def __hash__(self):
Coder.register_structured_urn(common_urns.coders.VARINT.urn, VarIntCoder)


class VarInt32Coder(FastCoder):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add this to CHANGES.md?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will do as a follow up (otherwise need to rebase this PR onto latest master to be able to add to 2.65.0 section)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened #34408

"""Variable-length integer coder matches Java SDK's VarIntCoder."""
def _create_impl(self):
return coder_impl.VarInt32CoderImpl()

def is_deterministic(self):
# type: () -> bool
return True

def to_type_hint(self):
return int

def __eq__(self, other):
return type(self) == type(other)

def __hash__(self):
return hash(type(self))


class BigEndianShortCoder(FastCoder):
"""A coder used for big-endian int16 values."""
def _create_impl(self):
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,20 @@ def test_varint_coder(self):
for k in range(0, int(math.log(MAX_64_BIT_INT)))
])

def test_varint32_coder(self):
# Small ints.
self.check_coder(coders.VarInt32Coder(), *range(-10, 10))
# Multi-byte encoding starts at 128
self.check_coder(coders.VarInt32Coder(), *range(120, 140))
# Large values
MAX_32_BIT_INT = 0x7fffffff
self.check_coder(
coders.VarIntCoder(),
*[
int(math.pow(-1, k) * math.exp(k))
for k in range(0, int(math.log(MAX_32_BIT_INT)))
])

def test_float_coder(self):
self.check_coder(
coders.FloatCoder(), *[float(0.1 * x) for x in range(-100, 100)])
Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/coders/row_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from apache_beam.coders.coders import SinglePrecisionFloatCoder
from apache_beam.coders.coders import StrUtf8Coder
from apache_beam.coders.coders import TimestampCoder
from apache_beam.coders.coders import VarInt32Coder
from apache_beam.coders.coders import VarIntCoder
from apache_beam.portability import common_urns
from apache_beam.portability.api import schema_pb2
Expand Down Expand Up @@ -142,8 +143,10 @@ def _coder_from_type(field_type):
def _nonnull_coder_from_type(field_type):
type_info = field_type.WhichOneof("type_info")
if type_info == "atomic_type":
if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64):
if field_type.atomic_type == schema_pb2.INT64:
return VarIntCoder()
elif field_type.atomic_type == schema_pb2.INT32:
return VarInt32Coder()
if field_type.atomic_type == schema_pb2.INT16:
return BigEndianShortCoder()
elif field_type.atomic_type == schema_pb2.FLOAT:
Expand Down
23 changes: 23 additions & 0 deletions sdks/python/apache_beam/coders/row_coder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,29 @@ def test_create_row_coder_from_schema(self):
for test_case in self.PEOPLE:
self.assertEqual(test_case, coder.decode(coder.encode(test_case)))

def test_row_coder_negative_varint(self):
schema = schema_pb2.Schema(
id="negative",
fields=[
schema_pb2.Field(
name="i64",
type=schema_pb2.FieldType(atomic_type=schema_pb2.INT64)),
schema_pb2.Field(
name="i32",
type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32))
])
coder = RowCoder(schema)
Negative = typing.NamedTuple(
"Negative", [
("i64", np.int64),
("i32", np.int32),
])
test_cases = [
Negative(-1, -1023), Negative(-1023, -1), Negative(-2**63, -2**31)
]
for test_case in test_cases:
self.assertEqual(test_case, coder.decode(coder.encode(test_case)))

@unittest.skip(
"https://github.com/apache/beam/issues/19696 - Overflow behavior in "
"VarIntCoder is currently inconsistent")
Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/coders/slow_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def write_var_int64(self, v: int) -> None:
if not v:
break

def write_var_int32(self, v: int) -> None:
self.write_var_int64(int(v) & 0xFFFFFFFF)

def write_bigendian_int64(self, v):
self.write(struct.pack('>q', v))

Expand Down Expand Up @@ -156,6 +159,10 @@ def read_var_int64(self):
result -= 1 << 64
return result

def read_var_int32(self):
v = self.read_var_int64()
return struct.unpack('<i', struct.pack('<I', v))[0]

def read_bigendian_int64(self):
return struct.unpack('>q', self.read(8))[0]

Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/coders/stream.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ cdef class OutputStream(object):
cpdef write(self, bytes b, bint nested=*)
cpdef write_byte(self, unsigned char val)
cpdef write_var_int64(self, libc.stdint.int64_t v)
cpdef write_var_int32(self, libc.stdint.int32_t v)
cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v)
cpdef write_bigendian_uint64(self, libc.stdint.uint64_t signed_v)
cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v)
Expand All @@ -43,6 +44,8 @@ cdef class ByteCountingOutputStream(OutputStream):
cdef size_t count

cpdef write(self, bytes b, bint nested=*)
cpdef write_var_int64(self, libc.stdint.int64_t val)
cpdef write_var_int32(self, libc.stdint.int32_t val)
cpdef write_byte(self, unsigned char val)
cpdef write_bigendian_int64(self, libc.stdint.int64_t val)
cpdef write_bigendian_uint64(self, libc.stdint.uint64_t val)
Expand All @@ -61,6 +64,7 @@ cdef class InputStream(object):
cpdef bytes read(self, size_t len)
cpdef long read_byte(self) except? -1
cpdef libc.stdint.int64_t read_var_int64(self) except? -1
cpdef libc.stdint.int32_t read_var_int32(self) except? -1
cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1
cpdef libc.stdint.uint64_t read_bigendian_uint64(self) except? -1
cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1
Expand Down
18 changes: 17 additions & 1 deletion sdks/python/apache_beam/coders/stream.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ cdef class OutputStream(object):
if not v:
break

cpdef write_var_int32(self, libc.stdint.int32_t signed_v):
"""Encode an int using variable-length encoding to a stream."""
cdef libc.stdint.int64_t v = signed_v & <libc.stdint.int64_t>(0xFFFFFFFF)
self.write_var_int64(v)

cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v):
self.write_bigendian_uint64(signed_v)

Expand All @@ -91,7 +96,7 @@ cdef class OutputStream(object):

cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
cdef libc.stdint.uint32_t v = signed_v
if self.buffer_size < self.pos + 4:
if self.buffer_size < self.pos + 4:
self.extend(4)
self.data[self.pos ] = <unsigned char>(v >> 24)
self.data[self.pos + 1] = <unsigned char>(v >> 16)
Expand Down Expand Up @@ -151,6 +156,12 @@ cdef class ByteCountingOutputStream(OutputStream):
cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
self.count += get_varint_size(signed_v)

cpdef write_var_int32(self, libc.stdint.int32_t signed_v):
if signed_v < 0:
self.count += 5
else:
self.count += get_varint_size(signed_v)

cpdef write_byte(self, unsigned char _):
self.count += 1

Expand Down Expand Up @@ -225,6 +236,11 @@ cdef class InputStream(object):

return result

cpdef libc.stdint.int32_t read_var_int32(self) except? -1:
"""Decode a variable-length encoded int32 from a stream."""
cdef libc.stdint.int64_t v = self.read_var_int64()
return <libc.stdint.int32_t>(v);

cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1:
return self.read_bigendian_uint64()

Expand Down
21 changes: 21 additions & 0 deletions sdks/python/apache_beam/coders/stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,27 @@ def test_medium_var_int64(self):
def test_large_var_int64(self):
self.run_read_write_var_int64([0, 2**63 - 1, -2**63, 2**63 - 3])

def run_read_write_var_int32(self, values):
out_s = self.OutputStream()
for v in values:
out_s.write_var_int32(v)
in_s = self.InputStream(out_s.get())
for v in values:
self.assertEqual(v, in_s.read_var_int32())

def test_small_var_int32(self):
self.run_read_write_var_int32(range(-10, 30))

def test_medium_var_int32(self):
base = -1.7
self.run_read_write_var_int32([
int(base**pow)
for pow in range(1, int(31 * math.log(2) / math.log(-base)))
])

def test_large_var_int32(self):
self.run_read_write_var_int32([0, 2**31 - 1, -2**31, 2**31 - 3])

def test_read_write_double(self):
values = 0, 1, -1, 1e100, 1.0 / 3, math.pi, float('inf')
out_s = self.OutputStream()
Expand Down
19 changes: 12 additions & 7 deletions sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,18 @@

JdbcTestRow = typing.NamedTuple(
"JdbcTestRow",
[("f_id", int), ("f_float", float), ("f_char", str), ("f_varchar", str),
("f_bytes", bytes), ("f_varbytes", bytes), ("f_timestamp", Timestamp),
("f_decimal", Decimal), ("f_date", datetime.date),
("f_time", datetime.time)],
[("f_id", int), ("f_id_long", int), ("f_float", float), ("f_char", str),
("f_varchar", str), ("f_bytes", bytes), ("f_varbytes", bytes),
("f_timestamp", Timestamp), ("f_decimal", Decimal),
("f_date", datetime.date), ("f_time", datetime.time)],
)
coders.registry.register_coder(JdbcTestRow, coders.RowCoder)

CustomSchemaRow = typing.NamedTuple(
"CustomSchemaRow",
[
("renamed_id", int),
("renamed_id_long", int),
("renamed_float", float),
("renamed_char", str),
("renamed_varchar", str),
Expand Down Expand Up @@ -184,7 +185,7 @@ def create_test_table(self, connection, table_name, database):
connection.execute(
sqlalchemy.text(
f"CREATE TABLE IF NOT EXISTS {table_name}" +
"(f_id INTEGER, f_float DOUBLE PRECISION, " +
"(f_id INTEGER, f_id_long BIGINT, f_float DOUBLE PRECISION, " +
"f_char CHAR(10), f_varchar VARCHAR(10), " +
f"f_bytes {binary_type[0]}, f_varbytes {binary_type[1]}, " +
"f_timestamp TIMESTAMP(3), f_decimal DECIMAL(10, 2), " +
Expand All @@ -193,7 +194,8 @@ def create_test_table(self, connection, table_name, database):
def generate_test_data(self, count):
return [
JdbcTestRow(
i,
i - 3,
i - 3,
i + 0.1,
f'Test{i}',
f'Test{i}',
Expand Down Expand Up @@ -225,6 +227,7 @@ def test_xlang_jdbc_write_read(self, database):

expected_rows.append(
JdbcTestRow(
row.f_id,
row.f_id,
row.f_float,
f_char,
Expand Down Expand Up @@ -310,6 +313,7 @@ def test_xlang_jdbc_read_with_explicit_schema(self, database):

expected_rows.append(
CustomSchemaRow(
row.f_id,
row.f_id,
row.f_float,
f_char,
Expand All @@ -324,6 +328,7 @@ def test_xlang_jdbc_read_with_explicit_schema(self, database):
def custom_row_equals(expected, actual):
return (
expected.renamed_id == actual.renamed_id and
expected.renamed_id_long == actual.renamed_id_long and
expected.renamed_float == actual.renamed_float and
expected.renamed_char.rstrip() == actual.renamed_char.rstrip() and
expected.renamed_varchar == actual.renamed_varchar and
Expand Down Expand Up @@ -390,7 +395,7 @@ def test_xlang_jdbc_custom_statements(self, database):
SimpleRow(2, "Item2", 20.75),
SimpleRow(3, "Item3", 30.25),
SimpleRow(4, "Item4", 40.0),
SimpleRow(5, "Item5", 50.5)
SimpleRow(-5, "Item5", 50.5)
]

config = self.jdbc_configs[database]
Expand Down
7 changes: 6 additions & 1 deletion sdks/python/apache_beam/utils/subprocess_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,12 @@ def local_jar(cls, url, cache_dir=None):
url_read = urlopen(url)
with open(cached_jar + '.tmp', 'wb') as jar_write:
shutil.copyfileobj(url_read, jar_write, length=1 << 20)
os.rename(cached_jar + '.tmp', cached_jar)
try:
os.rename(cached_jar + '.tmp', cached_jar)
except FileNotFoundError:
# A race when multiple programs run in parallel and the cached_jar
# is already moved. Safe to ignore.
pass
except URLError as e:
raise RuntimeError(
f'Unable to fetch remote job server jar at {url}: {e}. If no '
Expand Down
Loading