From dae17a5dc25b5ab34b5e150a3de398cdc2e27c19 Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Wed, 27 Mar 2024 14:07:05 -0400 Subject: [PATCH 01/15] fix several bugs regarding avto <-> beam schema conversion --- sdks/python/apache_beam/io/avroio.py | 66 +++++++++++++++++++---- sdks/python/apache_beam/io/avroio_test.py | 25 ++++++++- 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 24df59ddc5cc..c7c1a17afda7 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -65,6 +65,8 @@ from apache_beam.portability.api import schema_pb2 from apache_beam.transforms import PTransform from apache_beam.typehints import schemas +from apache_beam import coders +import ctypes __all__ = [ 'ReadFromAvro', @@ -544,12 +546,26 @@ def close(self, writer): _AvroSchemaType = Union[str, List, Dict] +# if the union type is a nullable and it is a nullable union of an avro primitive with a corresponding beam primitive +# then create a nullable beam field of the corresponding beam type, otherwise return an Any type +def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType: + if len(union_type) == 2 and "null" in union_type: + for avro_type in union_type: + if avro_type in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES: + return schema_pb2.FieldType( + atomic_type=AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES[avro_type], + nullable=True) + else: + schemas.typing_to_runner_api(Any) + return schemas.typing_to_runner_api(Any) + + def avro_type_to_beam_type(avro_type: _AvroSchemaType) -> schema_pb2.FieldType: if isinstance(avro_type, str): return avro_type_to_beam_type({'type': avro_type}) elif isinstance(avro_type, list): # Union type - return schemas.typing_to_runner_api(Any) + return avro_union_type_to_beam_type(avro_type) type_name = avro_type['type'] if type_name in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES: return schema_pb2.FieldType( @@ -605,11 +621,25 @@ def to_row(record): to_row) +# convert an avro atomic value to a beam atomic value +# if the avro type is an int or long, convert the value into from signed to unsigned +# because VarInt.java expects the number to be unsigned when decoding the number +def avro_atomic_value_to_beam_atomic_value(avro_type: str, value): + if avro_type == "int": + return ctypes.c_uint32(value).value + elif avro_type == "long": + return ctypes.c_uint64(value).value + else: + return value + + def avro_value_to_beam_value( beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: type_info = beam_type.WhichOneof("type_info") if type_info == "atomic_type": - return lambda value: value + avro_type = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type] + return lambda value: avro_atomic_value_to_beam_atomic_value( + avro_type, value) elif type_info == "array_type": element_converter = avro_value_to_beam_value( beam_type.array_type.element_type) @@ -649,7 +679,11 @@ def beam_schema_to_avro_schema( def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: type_info = beam_type.WhichOneof("type_info") if type_info == "atomic_type": - return {'type': BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type]} + avro_primitive = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type] + if beam_type.nullable: + return ['null', avro_primitive] + else: + return {'type': avro_primitive} elif type_info == "array_type": return { 'type': 'array', @@ -693,29 +727,43 @@ def beam_row_to_avro_dict( return lambda row: convert(row[0]) +# convert a beam atomic value to an avro atomic value +# since numeric values are converted to unsigned in avro_atomic_value_to_beam_atomic_value +# we need to convert back to a signed number +def beam_atomic_value_to_avro_atomic_value(avro_type: str, value): + if avro_type == "int": + return ctypes.c_int32(value).value + elif avro_type == "long": + return ctypes.c_int64(value).value + else: + return value + + def beam_value_to_avro_value( beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: type_info = beam_type.WhichOneof("type_info") if type_info == "atomic_type": - return lambda value: value + avro_type = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type] + return lambda value: beam_atomic_value_to_avro_atomic_value( + avro_type, value) elif type_info == "array_type": - element_converter = avro_value_to_beam_value( + element_converter = beam_value_to_avro_value( beam_type.array_type.element_type) return lambda value: [element_converter(e) for e in value] elif type_info == "iterable_type": - element_converter = avro_value_to_beam_value( + element_converter = beam_value_to_avro_value( beam_type.iterable_type.element_type) return lambda value: [element_converter(e) for e in value] elif type_info == "map_type": if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING: raise TypeError( - f'Only strings allowd as map keys when converting from AVRO, ' + f'Only strings allowed as map keys when converting from AVRO, ' f'found {beam_type}') - value_converter = avro_value_to_beam_value(beam_type.map_type.value_type) + value_converter = beam_value_to_avro_value(beam_type.map_type.value_type) return lambda value: {k: value_converter(v) for (k, v) in value.items()} elif type_info == "row_type": converters = { - field.name: avro_value_to_beam_value(field.type) + field.name: beam_value_to_avro_value(field.type) for field in beam_type.row_type.schema.fields } return lambda value: { diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index c54ac40711b1..7139672e3154 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -24,6 +24,7 @@ import unittest from typing import List +import fastavro import hamcrest as hc from fastavro.schema import parse_schema @@ -46,6 +47,8 @@ from apache_beam.transforms.userstate import CombiningValueStateSpec from apache_beam.utils.timestamp import Timestamp +from sdks.python.apache_beam.transforms.sql import SqlTransform + # Import snappy optionally; some tests will be skipped when import fails. try: import snappy # pylint: disable=import-error @@ -149,7 +152,7 @@ def _run_avro_test( def test_schema_read_write(self): with tempfile.TemporaryDirectory() as tmp_dirname: path = os.path.join(tmp_dirname, 'tmp_filename') - rows = [beam.Row(a=1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])] + rows = [beam.Row(a=-1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])] stable_repr = lambda row: json.dumps(row._asdict()) with TestPipeline() as p: _ = p | Create(rows) | avroio.WriteToAvro(path) | beam.Map(print) @@ -157,9 +160,29 @@ def test_schema_read_write(self): readback = ( p | avroio.ReadFromAvro(path + '*', as_rows=True) + # | SqlTransform("SELECT * FROM PCOLLECTION") | beam.Map(stable_repr)) assert_that(readback, equal_to([stable_repr(r) for r in rows])) + def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): + with tempfile.TemporaryDirectory() as tmp_dirname_input: + input_path = os.path.join(tmp_dirname_input, 'tmp_filename.avro') + parsed_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING)) + with open(input_path, 'wb') as tmp_avro_file: + fastavro.writer(tmp_avro_file, parsed_schema, self.RECORDS) + + with tempfile.TemporaryDirectory() as tmp_dirname_output: + + with TestPipeline() as p: + _ = ( + p + | avroio.ReadFromAvro(input_path, as_rows=True) + | SqlTransform("SELECT * FROM PCOLLECTION") + | avroio.WriteToAvro(tmp_dirname_output)) + with TestPipeline() as p: + readback = (p | avroio.ReadFromAvro(tmp_dirname_output + "*")) + assert_that(readback, equal_to(RECORDS)) + def test_read_without_splitting(self): file_name = self._write_data() expected_result = self.RECORDS From a30f46c8e9a6f4c890d356b6986f4efb2bbab8d0 Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Wed, 27 Mar 2024 14:23:42 -0400 Subject: [PATCH 02/15] fix test with my bizzare workaround --- sdks/python/apache_beam/io/avroio_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 7139672e3154..f0f8baf303a9 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -160,7 +160,7 @@ def test_schema_read_write(self): readback = ( p | avroio.ReadFromAvro(path + '*', as_rows=True) - # | SqlTransform("SELECT * FROM PCOLLECTION") + | SqlTransform("SELECT * FROM PCOLLECTION") | beam.Map(stable_repr)) assert_that(readback, equal_to([stable_repr(r) for r in rows])) From 654827b9117063417d61bf0ce0b5c8c174eeb204 Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Wed, 27 Mar 2024 14:55:30 -0400 Subject: [PATCH 03/15] fix import in test --- sdks/python/apache_beam/io/avroio_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index f0f8baf303a9..4ea17635ff5a 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -44,11 +44,10 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher +from apache_beam.transforms.sql import SqlTransform from apache_beam.transforms.userstate import CombiningValueStateSpec from apache_beam.utils.timestamp import Timestamp -from sdks.python.apache_beam.transforms.sql import SqlTransform - # Import snappy optionally; some tests will be skipped when import fails. try: import snappy # pylint: disable=import-error From a2524b076e387edc686f0d0b21bc75344c5c4991 Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Wed, 27 Mar 2024 16:04:15 -0400 Subject: [PATCH 04/15] fix linting issue --- sdks/python/apache_beam/io/avroio.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index c7c1a17afda7..aa13350b228c 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -43,6 +43,7 @@ Avro file. """ # pytype: skip-file +import ctypes import os from functools import partial from typing import Any @@ -65,8 +66,6 @@ from apache_beam.portability.api import schema_pb2 from apache_beam.transforms import PTransform from apache_beam.typehints import schemas -from apache_beam import coders -import ctypes __all__ = [ 'ReadFromAvro', @@ -546,8 +545,9 @@ def close(self, writer): _AvroSchemaType = Union[str, List, Dict] -# if the union type is a nullable and it is a nullable union of an avro primitive with a corresponding beam primitive -# then create a nullable beam field of the corresponding beam type, otherwise return an Any type +# if the union type is a nullable and it is a nullable union of an avro +# primitive with a corresponding beam primitive then create a nullable beam +# field of the corresponding beam type, otherwise return an Any type def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType: if len(union_type) == 2 and "null" in union_type: for avro_type in union_type: @@ -555,8 +555,7 @@ def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType: return schema_pb2.FieldType( atomic_type=AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES[avro_type], nullable=True) - else: - schemas.typing_to_runner_api(Any) + return schemas.typing_to_runner_api(Any) return schemas.typing_to_runner_api(Any) @@ -622,8 +621,9 @@ def to_row(record): # convert an avro atomic value to a beam atomic value -# if the avro type is an int or long, convert the value into from signed to unsigned -# because VarInt.java expects the number to be unsigned when decoding the number +# if the avro type is an int or long, convert the value into from signed to +# unsigned because VarInt.java expects the number to be unsigned when +# decoding the number def avro_atomic_value_to_beam_atomic_value(avro_type: str, value): if avro_type == "int": return ctypes.c_uint32(value).value @@ -728,8 +728,9 @@ def beam_row_to_avro_dict( # convert a beam atomic value to an avro atomic value -# since numeric values are converted to unsigned in avro_atomic_value_to_beam_atomic_value -# we need to convert back to a signed number +# since numeric values are converted to unsigned in +# avro_atomic_value_to_beam_atomic_value we need to convert +# back to a signed number def beam_atomic_value_to_avro_atomic_value(avro_type: str, value): if avro_type == "int": return ctypes.c_int32(value).value From c81b912cd5a49f0aed09ad03c25063bc6d0350be Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Thu, 28 Mar 2024 10:01:59 -0400 Subject: [PATCH 05/15] comments -> docstrings --- sdks/python/apache_beam/io/avroio.py | 29 +++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index aa13350b228c..07e1eac8dd2e 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -545,9 +545,12 @@ def close(self, writer): _AvroSchemaType = Union[str, List, Dict] -# if the union type is a nullable and it is a nullable union of an avro -# primitive with a corresponding beam primitive then create a nullable beam -# field of the corresponding beam type, otherwise return an Any type +"""convert an avro union type to a beam type + +if the union type is a nullable and it is a nullable union of an avro +primitive with a corresponding beam primitive then create a nullable beam +field of the corresponding beam type, otherwise return an Any type +""" def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType: if len(union_type) == 2 and "null" in union_type: for avro_type in union_type: @@ -620,10 +623,12 @@ def to_row(record): to_row) -# convert an avro atomic value to a beam atomic value -# if the avro type is an int or long, convert the value into from signed to -# unsigned because VarInt.java expects the number to be unsigned when -# decoding the number +"""convert an avro atomic value to a beam atomic value + +if the avro type is an int or long, convert the value into from signed to +unsigned because VarInt.java expects the number to be unsigned when +decoding the number +""" def avro_atomic_value_to_beam_atomic_value(avro_type: str, value): if avro_type == "int": return ctypes.c_uint32(value).value @@ -727,10 +732,12 @@ def beam_row_to_avro_dict( return lambda row: convert(row[0]) -# convert a beam atomic value to an avro atomic value -# since numeric values are converted to unsigned in -# avro_atomic_value_to_beam_atomic_value we need to convert -# back to a signed number +"""convert a beam atomic value to an avro atomic value + +since numeric values are converted to unsigned in +avro_atomic_value_to_beam_atomic_value we need to convert +back to a signed number +""" def beam_atomic_value_to_avro_atomic_value(avro_type: str, value): if avro_type == "int": return ctypes.c_int32(value).value From 0441140fdabb48a86eca546421d338f66d702b99 Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Thu, 28 Mar 2024 10:19:05 -0400 Subject: [PATCH 06/15] formatting --- sdks/python/apache_beam/io/avroio.py | 36 ++++++++++++++-------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 07e1eac8dd2e..9cdee45fceaa 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -545,13 +545,13 @@ def close(self, writer): _AvroSchemaType = Union[str, List, Dict] -"""convert an avro union type to a beam type - -if the union type is a nullable and it is a nullable union of an avro -primitive with a corresponding beam primitive then create a nullable beam -field of the corresponding beam type, otherwise return an Any type -""" def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType: + """convert an avro union type to a beam type + + if the union type is a nullable, and it is a nullable union of an avro + primitive with a corresponding beam primitive then create a nullable beam + field of the corresponding beam type, otherwise return an Any type. + """ if len(union_type) == 2 and "null" in union_type: for avro_type in union_type: if avro_type in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES: @@ -623,13 +623,13 @@ def to_row(record): to_row) -"""convert an avro atomic value to a beam atomic value - -if the avro type is an int or long, convert the value into from signed to -unsigned because VarInt.java expects the number to be unsigned when -decoding the number -""" def avro_atomic_value_to_beam_atomic_value(avro_type: str, value): + """convert an avro atomic value to a beam atomic value + + if the avro type is an int or long, convert the value into from signed to + unsigned because VarInt.java expects the number to be unsigned when + decoding the number. + """ if avro_type == "int": return ctypes.c_uint32(value).value elif avro_type == "long": @@ -732,13 +732,13 @@ def beam_row_to_avro_dict( return lambda row: convert(row[0]) -"""convert a beam atomic value to an avro atomic value - -since numeric values are converted to unsigned in -avro_atomic_value_to_beam_atomic_value we need to convert -back to a signed number -""" def beam_atomic_value_to_avro_atomic_value(avro_type: str, value): + """convert a beam atomic value to an avro atomic value + + since numeric values are converted to unsigned in + avro_atomic_value_to_beam_atomic_value we need to convert + back to a signed number. + """ if avro_type == "int": return ctypes.c_int32(value).value elif avro_type == "long": From 4f4d95885fca8c1dfeb61327a7b4ba3db037033e Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Fri, 29 Mar 2024 14:35:52 -0400 Subject: [PATCH 07/15] review comments --- sdks/python/apache_beam/io/avroio.py | 18 ++++--- sdks/python/apache_beam/io/avroio_test.py | 66 ++++++++++++++++++++--- 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 9cdee45fceaa..3e0289ce343f 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -630,7 +630,9 @@ def avro_atomic_value_to_beam_atomic_value(avro_type: str, value): unsigned because VarInt.java expects the number to be unsigned when decoding the number. """ - if avro_type == "int": + if value is None: + return value + elif avro_type == "int": return ctypes.c_uint32(value).value elif avro_type == "long": return ctypes.c_uint64(value).value @@ -656,7 +658,7 @@ def avro_value_to_beam_value( elif type_info == "map_type": if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING: raise TypeError( - f'Only strings allowd as map keys when converting from AVRO, ' + f'Only strings allowed as map keys when converting from AVRO, ' f'found {beam_type}') value_converter = avro_value_to_beam_value(beam_type.map_type.value_type) return lambda value: {k: value_converter(v) for (k, v) in value.items()} @@ -686,9 +688,9 @@ def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: if type_info == "atomic_type": avro_primitive = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type] if beam_type.nullable: - return ['null', avro_primitive] + return [avro_primitive, 'null'] else: - return {'type': avro_primitive} + return avro_primitive elif type_info == "array_type": return { 'type': 'array', @@ -702,7 +704,7 @@ def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: elif type_info == "map_type": if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING: raise TypeError( - f'Only strings allowd as map keys when converting to AVRO, ' + f'Only strings allowed as map keys when converting to AVRO, ' f'found {beam_type}') return { 'type': 'map', @@ -717,7 +719,7 @@ def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: } for field in beam_type.row_type.schema.fields], } else: - raise ValueError(f"Unconvertale type: {beam_type}") + raise ValueError(f"Unconvertable type: {beam_type}") def beam_row_to_avro_dict( @@ -739,7 +741,9 @@ def beam_atomic_value_to_avro_atomic_value(avro_type: str, value): avro_atomic_value_to_beam_atomic_value we need to convert back to a signed number. """ - if avro_type == "int": + if value is None: + return value + elif avro_type == "int": return ctypes.c_int32(value).value elif avro_type == "long": return ctypes.c_int64(value).value diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 4ea17635ff5a..9c559d676f8b 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -20,9 +20,10 @@ import logging import math import os +import pytest import tempfile import unittest -from typing import List +from typing import List, Any import fastavro import hamcrest as hc @@ -31,12 +32,16 @@ from fastavro import writer import apache_beam as beam -from apache_beam import Create +from apache_beam import Create, schema_pb2 from apache_beam.io import avroio from apache_beam.io import filebasedsource from apache_beam.io import iobase from apache_beam.io import source_test_utils -from apache_beam.io.avroio import _FastAvroSource # For testing +from apache_beam.io.avroio import _FastAvroSource, avro_schema_to_beam_schema, \ + beam_schema_to_avro_schema # For testing +from apache_beam.io.avroio import avro_atomic_value_to_beam_atomic_value # For testing +from apache_beam.io.avroio import avro_union_type_to_beam_type # For testing +from apache_beam.io.avroio import beam_atomic_value_to_avro_atomic_value # For testing from apache_beam.io.avroio import _create_avro_sink # For testing from apache_beam.io.filesystems import FileSystems from apache_beam.testing.test_pipeline import TestPipeline @@ -47,6 +52,7 @@ from apache_beam.transforms.sql import SqlTransform from apache_beam.transforms.userstate import CombiningValueStateSpec from apache_beam.utils.timestamp import Timestamp +from apache_beam.typehints import schemas # Import snappy optionally; some tests will be skipped when import fails. try: @@ -151,7 +157,7 @@ def _run_avro_test( def test_schema_read_write(self): with tempfile.TemporaryDirectory() as tmp_dirname: path = os.path.join(tmp_dirname, 'tmp_filename') - rows = [beam.Row(a=-1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])] + rows = [beam.Row(a=1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])] stable_repr = lambda row: json.dumps(row._asdict()) with TestPipeline() as p: _ = p | Create(rows) | avroio.WriteToAvro(path) | beam.Map(print) @@ -159,16 +165,21 @@ def test_schema_read_write(self): readback = ( p | avroio.ReadFromAvro(path + '*', as_rows=True) - | SqlTransform("SELECT * FROM PCOLLECTION") | beam.Map(stable_repr)) assert_that(readback, equal_to([stable_repr(r) for r in rows])) + @pytest.mark.xlang_sql_expansion_service def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): + records = [] + records.extend(self.RECORDS) + records.append({ + 'name': 'Bruce', 'favorite_number': None, 'favorite_color': None + }) with tempfile.TemporaryDirectory() as tmp_dirname_input: input_path = os.path.join(tmp_dirname_input, 'tmp_filename.avro') parsed_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING)) with open(input_path, 'wb') as tmp_avro_file: - fastavro.writer(tmp_avro_file, parsed_schema, self.RECORDS) + fastavro.writer(tmp_avro_file, parsed_schema, records) with tempfile.TemporaryDirectory() as tmp_dirname_output: @@ -180,7 +191,48 @@ def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): | avroio.WriteToAvro(tmp_dirname_output)) with TestPipeline() as p: readback = (p | avroio.ReadFromAvro(tmp_dirname_output + "*")) - assert_that(readback, equal_to(RECORDS)) + assert_that(readback, equal_to(records)) + + def test_avro_atomic_value_to_beam_atomic_value(self): + input_outputs = [('int', 1, 1), ('int', -1, 0xffffffff), + ('int', None, None), ('long', 1, 1), + ('long', -1, 0xffffffffffffffff), ('long', None, None), + ('string', 'foo', 'foo')] + for test_avro_type, test_value, expected_value in input_outputs: + actual_value = avro_atomic_value_to_beam_atomic_value( + test_avro_type, test_value) + hc.assert_that(actual_value, hc.equal_to(expected_value)) + + def test_beam_atomic_value_to_avro_atomic_value(self): + input_outputs = [('int', 1, 1), ('int', 0xffffffff, -1), + ('int', None, None), ('long', 1, 1), + ('long', 0xffffffffffffffff, -1), ('long', None, None), + ('string', 'foo', 'foo')] + for test_avro_type, test_value, expected_value in input_outputs: + actual_value = beam_atomic_value_to_avro_atomic_value( + test_avro_type, test_value) + hc.assert_that(actual_value, hc.equal_to(expected_value)) + + def test_avro_union_type_to_beam_type_with_nullable_long(self): + union_type = ['null', 'long'] + beam_type = avro_union_type_to_beam_type(union_type) + expected_beam_type = schema_pb2.FieldType( + atomic_type=schema_pb2.INT64, nullable=True) + hc.assert_that(beam_type, hc.equal_to(expected_beam_type)) + + def test_avro_union_type_to_beam_type_with_string_long(self): + union_type = ['string', 'long'] + beam_type = avro_union_type_to_beam_type(union_type) + expected_beam_type = schemas.typing_to_runner_api(Any) + hc.assert_that(beam_type, hc.equal_to(expected_beam_type)) + + def test_avro_schema_to_beam_and_back(self): + avro_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING)) + beam_schema = avro_schema_to_beam_schema(avro_schema) + converted_avro_schema = beam_schema_to_avro_schema(beam_schema) + expected_fields = json.loads(self.SCHEMA_STRING)["fields"] + hc.assert_that( + converted_avro_schema["fields"], hc.equal_to(expected_fields)) def test_read_without_splitting(self): file_name = self._write_data() From c30bf3802ed9b64bb89ab5773043c20d84649f54 Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Mon, 15 Apr 2024 12:08:21 -0400 Subject: [PATCH 08/15] add skip test config to sqltransform test --- sdks/python/apache_beam/io/avroio_test.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 9c559d676f8b..732eb3bc4c4a 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -37,13 +37,15 @@ from apache_beam.io import filebasedsource from apache_beam.io import iobase from apache_beam.io import source_test_utils -from apache_beam.io.avroio import _FastAvroSource, avro_schema_to_beam_schema, \ - beam_schema_to_avro_schema # For testing +from apache_beam.io.avroio import _FastAvroSource # For testing +from apache_beam.io.avroio import avro_schema_to_beam_schema # For testing +from apache_beam.io.avroio import beam_schema_to_avro_schema # For testing from apache_beam.io.avroio import avro_atomic_value_to_beam_atomic_value # For testing from apache_beam.io.avroio import avro_union_type_to_beam_type # For testing from apache_beam.io.avroio import beam_atomic_value_to_avro_atomic_value # For testing from apache_beam.io.avroio import _create_avro_sink # For testing from apache_beam.io.filesystems import FileSystems +from apache_beam.options.pipeline_options import StandardOptions from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -169,6 +171,10 @@ def test_schema_read_write(self): assert_that(readback, equal_to([stable_repr(r) for r in rows])) @pytest.mark.xlang_sql_expansion_service + @unittest.skipIf( + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner is + None, + "Must be run with a runner that supports staging java artifacts.") def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): records = [] records.extend(self.RECORDS) From bec6d973711882edcd1aa1e3fd8df2896b27c93f Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Mon, 15 Apr 2024 15:07:02 -0400 Subject: [PATCH 09/15] fix docstrings --- sdks/python/apache_beam/io/avroio.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 3e0289ce343f..697461456837 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -551,6 +551,12 @@ def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType: if the union type is a nullable, and it is a nullable union of an avro primitive with a corresponding beam primitive then create a nullable beam field of the corresponding beam type, otherwise return an Any type. + + Args: + union_type: the avro union type to convert. + + Returns: + the beam type of the avro union. """ if len(union_type) == 2 and "null" in union_type: for avro_type in union_type: @@ -629,6 +635,13 @@ def avro_atomic_value_to_beam_atomic_value(avro_type: str, value): if the avro type is an int or long, convert the value into from signed to unsigned because VarInt.java expects the number to be unsigned when decoding the number. + + Args: + avro_type: the avro type of the corresponding value. + value: the avro atomic value. + + Returns: + the converted beam atomic value. """ if value is None: return value @@ -740,6 +753,13 @@ def beam_atomic_value_to_avro_atomic_value(avro_type: str, value): since numeric values are converted to unsigned in avro_atomic_value_to_beam_atomic_value we need to convert back to a signed number. + + Args: + avro_type: avro type of the corresponding value. + value: the beam atomic value. + + Returns: + the converted avro atomic value. """ if value is None: return value From 2334c8d1d36e1971bc72befd947071c5425a46d8 Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Mon, 15 Apr 2024 15:23:22 -0400 Subject: [PATCH 10/15] formatting --- sdks/python/apache_beam/io/avroio_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 732eb3bc4c4a..c95fbb612592 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -172,9 +172,9 @@ def test_schema_read_write(self): @pytest.mark.xlang_sql_expansion_service @unittest.skipIf( - TestPipeline().get_pipeline_options().view_as(StandardOptions).runner is - None, - "Must be run with a runner that supports staging java artifacts.") + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner is + None, + "Must be run with a runner that supports staging java artifacts.") def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): records = [] records.extend(self.RECORDS) From e07d6b809ebb9a70ba195fdcdf08d7cdb216e940 Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Mon, 15 Apr 2024 17:15:14 -0400 Subject: [PATCH 11/15] review comment --- sdks/python/apache_beam/io/avroio.py | 8 ++++---- sdks/python/apache_beam/io/avroio_test.py | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 697461456837..64dc19753598 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -431,6 +431,7 @@ def expand(self, pcoll): "An explicit schema is required to write non-schema'd PCollections." ) from exn avro_schema = beam_schema_to_avro_schema(beam_schema) + print("HERE: ", avro_schema) records = pcoll | beam.Map( beam_row_to_avro_dict(avro_schema, beam_schema)) self._sink = self._sink_provider(avro_schema) @@ -700,10 +701,9 @@ def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: type_info = beam_type.WhichOneof("type_info") if type_info == "atomic_type": avro_primitive = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type] - if beam_type.nullable: - return [avro_primitive, 'null'] - else: - return avro_primitive + return [avro_primitive, 'null'] if beam_type.nullable else { + 'type': avro_primitive + } elif type_info == "array_type": return { 'type': 'array', diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index c95fbb612592..2eb27ddee8a6 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -90,7 +90,7 @@ def __init__(self, methodName='runTest'): "type": "record", "name": "User", "fields": [ - {"name": "name", "type": "string"}, + {"name": "name", "type": {"type": "string"}}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] @@ -171,10 +171,10 @@ def test_schema_read_write(self): assert_that(readback, equal_to([stable_repr(r) for r in rows])) @pytest.mark.xlang_sql_expansion_service - @unittest.skipIf( - TestPipeline().get_pipeline_options().view_as(StandardOptions).runner is - None, - "Must be run with a runner that supports staging java artifacts.") + # @unittest.skipIf( + # TestPipeline().get_pipeline_options().view_as(StandardOptions).runner is + # None, + # "Must be run with a runner that supports staging java artifacts.") def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): records = [] records.extend(self.RECORDS) From dfa7d12a26c335e3c1b001d121a7bef0e46964b5 Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Tue, 16 Apr 2024 09:34:12 -0400 Subject: [PATCH 12/15] uncomment test skip --- sdks/python/apache_beam/io/avroio_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 2eb27ddee8a6..0f4920874ca2 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -171,10 +171,10 @@ def test_schema_read_write(self): assert_that(readback, equal_to([stable_repr(r) for r in rows])) @pytest.mark.xlang_sql_expansion_service - # @unittest.skipIf( - # TestPipeline().get_pipeline_options().view_as(StandardOptions).runner is - # None, - # "Must be run with a runner that supports staging java artifacts.") + @unittest.skipIf( + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner is + None, + "Must be run with a runner that supports staging java artifacts.") def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self): records = [] records.extend(self.RECORDS) From 9d489536af64500149ab2eaae4b38e33e9b68f4a Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Tue, 16 Apr 2024 13:10:20 -0400 Subject: [PATCH 13/15] always return map in beam_type_to_avro_type --- sdks/python/apache_beam/io/avroio.py | 20 ++++++++++++++------ sdks/python/apache_beam/io/avroio_test.py | 2 +- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 64dc19753598..6f03374286d6 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -701,9 +701,10 @@ def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: type_info = beam_type.WhichOneof("type_info") if type_info == "atomic_type": avro_primitive = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type] - return [avro_primitive, 'null'] if beam_type.nullable else { - 'type': avro_primitive - } + avro_type = [ + avro_primitive, 'null' + ] if beam_type.nullable else avro_primitive + return {'type': avro_type} elif type_info == "array_type": return { 'type': 'array', @@ -724,12 +725,19 @@ def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: 'values': beam_type_to_avro_type(beam_type.map_type.element_type) } elif type_info == "row_type": + fields = [] + for field in beam_type.row_type.schema.fields: + avro_type = beam_type_to_avro_type(field.type) + # if mapping to a avro primitive or a union, don't nest the field type + # for complex types, like arrays, we need to nest the type + field_type = avro_type['type'] if field.type.WhichOneof( + "type_info") == "atomic_type" else avro_type + avro_field = {'name': field.name, 'type': field_type} + fields.append(avro_field) return { 'type': 'record', 'name': beam_type.row_type.schema.id, - 'fields': [{ - 'name': field.name, 'type': beam_type_to_avro_type(field.type) - } for field in beam_type.row_type.schema.fields], + 'fields': fields, } else: raise ValueError(f"Unconvertable type: {beam_type}") diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 0f4920874ca2..c95fbb612592 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -90,7 +90,7 @@ def __init__(self, methodName='runTest'): "type": "record", "name": "User", "fields": [ - {"name": "name", "type": {"type": "string"}}, + {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] From 7be838ab464df3b8bcb4fa5af084e1428b34d81e Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Fri, 19 Apr 2024 12:57:01 -0400 Subject: [PATCH 14/15] unnest primitive types for arrays and maps too --- sdks/python/apache_beam/io/avroio.py | 39 ++++++++++++++++++---------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 6f03374286d6..ac68e60d2549 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -697,6 +697,26 @@ def beam_schema_to_avro_schema( schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema))) +def unnest_primitive_type(beam_type: schema_pb2.FieldType): + """unnests beam types that map to avro primitives or unions. + + if mapping to a avro primitive or a union, don't nest the field type + for complex types, like arrays, we need to nest the type. + Example: { 'type': 'string' } -> 'string' + { 'type': 'array', 'items': 'string' } + -> { 'type': 'array', 'items': 'string' } + + Args: + beam_type: the beam type to map to avro. + + Returns: + the converted avro type with the primitive or union type unnested. + """ + avro_type = beam_type_to_avro_type(beam_type) + return avro_type['type'] if beam_type.WhichOneof( + "type_info") == "atomic_type" else avro_type + + def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: type_info = beam_type.WhichOneof("type_info") if type_info == "atomic_type": @@ -708,12 +728,12 @@ def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: elif type_info == "array_type": return { 'type': 'array', - 'items': beam_type_to_avro_type(beam_type.array_type.element_type) + 'items': unnest_primitive_type(beam_type.array_type.element_type) } elif type_info == "iterable_type": return { 'type': 'array', - 'items': beam_type_to_avro_type(beam_type.iterable_type.element_type) + 'items': unnest_primitive_type(beam_type.iterable_type.element_type) } elif type_info == "map_type": if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING: @@ -722,22 +742,15 @@ def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: f'found {beam_type}') return { 'type': 'map', - 'values': beam_type_to_avro_type(beam_type.map_type.element_type) + 'values': unnest_primitive_type(beam_type.map_type.element_type) } elif type_info == "row_type": - fields = [] - for field in beam_type.row_type.schema.fields: - avro_type = beam_type_to_avro_type(field.type) - # if mapping to a avro primitive or a union, don't nest the field type - # for complex types, like arrays, we need to nest the type - field_type = avro_type['type'] if field.type.WhichOneof( - "type_info") == "atomic_type" else avro_type - avro_field = {'name': field.name, 'type': field_type} - fields.append(avro_field) return { 'type': 'record', 'name': beam_type.row_type.schema.id, - 'fields': fields, + 'fields': [{ + 'name': field.name, 'type': unnest_primitive_type(field.type) + } for field in beam_type.row_type.schema.fields], } else: raise ValueError(f"Unconvertable type: {beam_type}") From 097005a9acfd0a468a91d794f0e7e48efbfb442d Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Mon, 22 Apr 2024 16:37:30 -0400 Subject: [PATCH 15/15] remove print stmt --- sdks/python/apache_beam/io/avroio.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index ac68e60d2549..8b7958a00b80 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -431,7 +431,6 @@ def expand(self, pcoll): "An explicit schema is required to write non-schema'd PCollections." ) from exn avro_schema = beam_schema_to_avro_schema(beam_schema) - print("HERE: ", avro_schema) records = pcoll | beam.Map( beam_row_to_avro_dict(avro_schema, beam_schema)) self._sink = self._sink_provider(avro_schema)