From 858cce17fbc7e0c460d478eda2deda508d07bf6e Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 8 Oct 2023 17:25:00 -0700 Subject: [PATCH] Fix Iceberg to Avro Schema Conversion: Fixed, Decimal, UUID --- pyiceberg/utils/schema_conversion.py | 14 +++++++++++--- tests/avro/test_file.py | 7 +++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pyiceberg/utils/schema_conversion.py b/pyiceberg/utils/schema_conversion.py index 74d0ae9ee7..d4b7aab4f1 100644 --- a/pyiceberg/utils/schema_conversion.py +++ b/pyiceberg/utils/schema_conversion.py @@ -48,6 +48,7 @@ TimeType, UUIDType, ) +from pyiceberg.utils.decimal import decimal_required_bytes logger = logging.getLogger(__name__) @@ -565,10 +566,17 @@ def map(self, map_type: MapType, key_result: AvroType, value_result: AvroType) - } def visit_fixed(self, fixed_type: FixedType) -> AvroType: - return {"type": "fixed", "size": len(fixed_type)} + return {"type": "fixed", "size": len(fixed_type), "name": f"fixed_{len(fixed_type)}"} def visit_decimal(self, decimal_type: DecimalType) -> AvroType: - return {"type": "bytes", "logicalType": "decimal", "precision": decimal_type.precision, "scale": decimal_type.scale} + return { + "type": "fixed", + "size": decimal_required_bytes(decimal_type.precision), + "logicalType": "decimal", + "precision": decimal_type.precision, + "scale": decimal_type.scale, + "name": f"decimal_{decimal_type.precision}_{decimal_type.scale}", + } def visit_boolean(self, boolean_type: BooleanType) -> AvroType: return "boolean" @@ -603,7 +611,7 @@ def visit_string(self, string_type: StringType) -> AvroType: return "string" def visit_uuid(self, uuid_type: UUIDType) -> AvroType: - return {"type": "fixed", "size": "16", "logicalType": "uuid"} + return {"type": "fixed", "size": 16, "logicalType": "uuid", "name": "uuid_fixed"} def visit_binary(self, binary_type: BinaryType) -> AvroType: return "bytes" diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index e9dcc7eca1..e2717a4696 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -286,5 +286,12 @@ def __init__(self, *data: Any, **named_data: Any) -> None: it = iter(avro_reader) avro_entry = next(it) + # read with fastavro + with open(tmp_avro_file, "rb") as fo: + r = reader(fo=fo) + it_fastavro = iter(r) + avro_entry_read_with_fastavro = list(next(it_fastavro).values()) + for idx, field in enumerate(all_primitives_schema.as_struct()): assert record[idx] == avro_entry[idx], f"Invalid {field}" + assert record[idx] == avro_entry_read_with_fastavro[idx], f"Invalid {field} read with fastavro"