diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 8b7958a00b80..d22ac84fea36 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -43,7 +43,6 @@ Avro file. """ # pytype: skip-file -import ctypes import os from functools import partial from typing import Any @@ -629,37 +628,11 @@ def to_row(record): to_row) -def avro_atomic_value_to_beam_atomic_value(avro_type: str, value): - """convert an avro atomic value to a beam atomic value - - if the avro type is an int or long, convert the value into from signed to - unsigned because VarInt.java expects the number to be unsigned when - decoding the number. - - Args: - avro_type: the avro type of the corresponding value. - value: the avro atomic value. - - Returns: - the converted beam atomic value. - """ - if value is None: - return value - elif avro_type == "int": - return ctypes.c_uint32(value).value - elif avro_type == "long": - return ctypes.c_uint64(value).value - else: - return value - - def avro_value_to_beam_value( beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: type_info = beam_type.WhichOneof("type_info") if type_info == "atomic_type": - avro_type = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type] - return lambda value: avro_atomic_value_to_beam_atomic_value( - avro_type, value) + return lambda value: value elif type_info == "array_type": element_converter = avro_value_to_beam_value( beam_type.array_type.element_type) @@ -767,37 +740,11 @@ def beam_row_to_avro_dict( return lambda row: convert(row[0]) -def beam_atomic_value_to_avro_atomic_value(avro_type: str, value): - """convert a beam atomic value to an avro atomic value - - since numeric values are converted to unsigned in - avro_atomic_value_to_beam_atomic_value we need to convert - back to a signed number. - - Args: - avro_type: avro type of the corresponding value. - value: the beam atomic value. - - Returns: - the converted avro atomic value. - """ - if value is None: - return value - elif avro_type == "int": - return ctypes.c_int32(value).value - elif avro_type == "long": - return ctypes.c_int64(value).value - else: - return value - - def beam_value_to_avro_value( beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: type_info = beam_type.WhichOneof("type_info") if type_info == "atomic_type": - avro_type = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type] - return lambda value: beam_atomic_value_to_avro_atomic_value( - avro_type, value) + return lambda value: value elif type_info == "array_type": element_converter = beam_value_to_avro_value( beam_type.array_type.element_type) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 77b20117e702..d4fc3259594a 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -40,9 +40,7 @@ from apache_beam.io.avroio import _FastAvroSource # For testing from apache_beam.io.avroio import avro_schema_to_beam_schema # For testing from apache_beam.io.avroio import beam_schema_to_avro_schema # For testing -from apache_beam.io.avroio import avro_atomic_value_to_beam_atomic_value # For testing from apache_beam.io.avroio import avro_union_type_to_beam_type # For testing -from apache_beam.io.avroio import beam_atomic_value_to_avro_atomic_value # For testing from apache_beam.io.avroio import avro_dict_to_beam_row # For testing from apache_beam.io.avroio import beam_row_to_avro_dict # For testing from apache_beam.io.avroio import _create_avro_sink # For testing @@ -195,26 +193,6 @@ def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): | beam.Map(beam_row_to_avro_dict(avro_schema, beam_schema))) assert_that(readback, equal_to(records)) - def test_avro_atomic_value_to_beam_atomic_value(self): - input_outputs = [('int', 1, 1), ('int', -1, 0xffffffff), - ('int', None, None), ('long', 1, 1), - ('long', -1, 0xffffffffffffffff), ('long', None, None), - ('string', 'foo', 'foo')] - for test_avro_type, test_value, expected_value in input_outputs: - actual_value = avro_atomic_value_to_beam_atomic_value( - test_avro_type, test_value) - hc.assert_that(actual_value, hc.equal_to(expected_value)) - - def test_beam_atomic_value_to_avro_atomic_value(self): - input_outputs = [('int', 1, 1), ('int', 0xffffffff, -1), - ('int', None, None), ('long', 1, 1), - ('long', 0xffffffffffffffff, -1), ('long', None, None), - ('string', 'foo', 'foo')] - for test_avro_type, test_value, expected_value in input_outputs: - actual_value = beam_atomic_value_to_avro_atomic_value( - test_avro_type, test_value) - hc.assert_that(actual_value, hc.equal_to(expected_value)) - def test_avro_union_type_to_beam_type_with_nullable_long(self): union_type = ['null', 'long'] beam_type = avro_union_type_to_beam_type(union_type)