Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 2 additions & 55 deletions sdks/python/apache_beam/io/avroio.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
Avro file.
"""
# pytype: skip-file
import ctypes
import os
from functools import partial
from typing import Any
Expand Down Expand Up @@ -629,37 +628,11 @@ def to_row(record):
to_row)


def avro_atomic_value_to_beam_atomic_value(avro_type: str, value):
"""convert an avro atomic value to a beam atomic value

if the avro type is an int or long, convert the value into from signed to
unsigned because VarInt.java expects the number to be unsigned when
decoding the number.

Args:
avro_type: the avro type of the corresponding value.
value: the avro atomic value.

Returns:
the converted beam atomic value.
"""
if value is None:
return value
elif avro_type == "int":
return ctypes.c_uint32(value).value
elif avro_type == "long":
return ctypes.c_uint64(value).value
else:
return value


def avro_value_to_beam_value(
beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]:
type_info = beam_type.WhichOneof("type_info")
if type_info == "atomic_type":
avro_type = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type]
return lambda value: avro_atomic_value_to_beam_atomic_value(
avro_type, value)
return lambda value: value
elif type_info == "array_type":
element_converter = avro_value_to_beam_value(
beam_type.array_type.element_type)
Expand Down Expand Up @@ -767,37 +740,11 @@ def beam_row_to_avro_dict(
return lambda row: convert(row[0])


def beam_atomic_value_to_avro_atomic_value(avro_type: str, value):
"""convert a beam atomic value to an avro atomic value

since numeric values are converted to unsigned in
avro_atomic_value_to_beam_atomic_value we need to convert
back to a signed number.

Args:
avro_type: avro type of the corresponding value.
value: the beam atomic value.

Returns:
the converted avro atomic value.
"""
if value is None:
return value
elif avro_type == "int":
return ctypes.c_int32(value).value
elif avro_type == "long":
return ctypes.c_int64(value).value
else:
return value


def beam_value_to_avro_value(
beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]:
type_info = beam_type.WhichOneof("type_info")
if type_info == "atomic_type":
avro_type = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type]
return lambda value: beam_atomic_value_to_avro_atomic_value(
avro_type, value)
return lambda value: value
elif type_info == "array_type":
element_converter = beam_value_to_avro_value(
beam_type.array_type.element_type)
Expand Down
22 changes: 0 additions & 22 deletions sdks/python/apache_beam/io/avroio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
from apache_beam.io.avroio import _FastAvroSource # For testing
from apache_beam.io.avroio import avro_schema_to_beam_schema # For testing
from apache_beam.io.avroio import beam_schema_to_avro_schema # For testing
from apache_beam.io.avroio import avro_atomic_value_to_beam_atomic_value # For testing
from apache_beam.io.avroio import avro_union_type_to_beam_type # For testing
from apache_beam.io.avroio import beam_atomic_value_to_avro_atomic_value # For testing
from apache_beam.io.avroio import avro_dict_to_beam_row # For testing
from apache_beam.io.avroio import beam_row_to_avro_dict # For testing
from apache_beam.io.avroio import _create_avro_sink # For testing
Expand Down Expand Up @@ -195,26 +193,6 @@ def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self):
| beam.Map(beam_row_to_avro_dict(avro_schema, beam_schema)))
assert_that(readback, equal_to(records))

def test_avro_atomic_value_to_beam_atomic_value(self):
input_outputs = [('int', 1, 1), ('int', -1, 0xffffffff),
('int', None, None), ('long', 1, 1),
('long', -1, 0xffffffffffffffff), ('long', None, None),
('string', 'foo', 'foo')]
for test_avro_type, test_value, expected_value in input_outputs:
actual_value = avro_atomic_value_to_beam_atomic_value(
test_avro_type, test_value)
hc.assert_that(actual_value, hc.equal_to(expected_value))

def test_beam_atomic_value_to_avro_atomic_value(self):
input_outputs = [('int', 1, 1), ('int', 0xffffffff, -1),
('int', None, None), ('long', 1, 1),
('long', 0xffffffffffffffff, -1), ('long', None, None),
('string', 'foo', 'foo')]
for test_avro_type, test_value, expected_value in input_outputs:
actual_value = beam_atomic_value_to_avro_atomic_value(
test_avro_type, test_value)
hc.assert_that(actual_value, hc.equal_to(expected_value))

def test_avro_union_type_to_beam_type_with_nullable_long(self):
union_type = ['null', 'long']
beam_type = avro_union_type_to_beam_type(union_type)
Expand Down
Loading