From 89cecbe740aeda65af3bbee007f14bdc0a8c1946 Mon Sep 17 00:00:00 2001 From: Michael Marino Date: Sat, 20 Jan 2024 11:02:32 +0100 Subject: [PATCH 1/7] Set Glue Table Information when creating/updating tables Resolves #216. This PR adds information about the schema (on update/create) and location (create) of the table to Glue, enabling both an improved UI experience as well as querying with Athena. --- pyiceberg/catalog/glue.py | 117 ++++++++++++++++++++++++++++++++++++- tests/catalog/test_glue.py | 43 +++++++++++++- 2 files changed, 156 insertions(+), 4 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 6cf9462b71..df13a7adf5 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -18,6 +18,7 @@ from typing import ( Any, + Dict, List, Optional, Set, @@ -28,6 +29,7 @@ import boto3 from mypy_boto3_glue.client import GlueClient from mypy_boto3_glue.type_defs import ( + ColumnTypeDef, DatabaseInputTypeDef, DatabaseTypeDef, StorageDescriptorTypeDef, @@ -62,9 +64,29 @@ from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata -from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.metadata import TableMetadataCommonFields, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT +from pyiceberg.types import ( + BinaryType, + BooleanType, + DateType, + DecimalType, + DoubleType, + FixedType, + FloatType, + IcebergType, + IntegerType, + ListType, + LongType, + MapType, + NestedField, + StringType, + StructType, + TimestampType, + TimeType, + UUIDType, +) # If Glue should skip archiving an old table version when creating a new version in a commit. By # default, Glue archives all old table versions after an UpdateTable call, but Glue has a default @@ -73,6 +95,10 @@ GLUE_SKIP_ARCHIVE = "glue.skip-archive" GLUE_SKIP_ARCHIVE_DEFAULT = True +ICEBERG_FIELD_ID = "iceberg.field.id" +ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional" +ICEBERG_FIELD_CURRENT = "iceberg.field.current" + def _construct_parameters( metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None @@ -84,10 +110,92 @@ def _construct_parameters( return new_parameters +def _type_to_glue_type_string(input_type: IcebergType) -> str: + if isinstance(input_type, BooleanType): + return "boolean" + if isinstance(input_type, IntegerType): + return "int" + if isinstance(input_type, LongType): + return "bigint" + if isinstance(input_type, FloatType): + return "float" + if isinstance(input_type, DoubleType): + return "double" + if isinstance(input_type, DateType): + return "date" + if isinstance( + input_type, + ( + TimeType, + StringType, + UUIDType, + ), + ): + return "string" + if isinstance(input_type, TimestampType): + return "timestamp" + if isinstance( + input_type, + ( + FixedType, + BinaryType, + ), + ): + return "binary" + if isinstance(input_type, DecimalType): + return f"decimal({input_type.precision},{input_type.scale})" + if isinstance(input_type, StructType): + name_to_type = ",".join(f"{f.name}:{_type_to_glue_type_string(f.field_type)}" for f in input_type.fields) + return f"struct<{name_to_type}>" + if isinstance(input_type, ListType): + return f"array<{_type_to_glue_type_string(input_type.element_type)}>" + if isinstance(input_type, MapType): + return f"map<{_type_to_glue_type_string(input_type.key_type)},{_type_to_glue_type_string(input_type.value_type)}>" + + raise ValueError(f"Unknown Type {input_type}") + + +def _to_columns(metadata: TableMetadataCommonFields) -> List[ColumnTypeDef]: + results: Dict[str, ColumnTypeDef] = {} + + def _append_to_results(field: NestedField, is_current: bool) -> None: + if field.name in results: + return + + results[field.name] = cast( + ColumnTypeDef, + { + "Name": field.name, + "Type": _type_to_glue_type_string(field.field_type), + "Parameters": { + ICEBERG_FIELD_ID: str(field.field_id), + ICEBERG_FIELD_OPTIONAL: str(field.optional).lower(), + ICEBERG_FIELD_CURRENT: str(is_current).lower(), + }, + }, + ) + if field.doc: + results[field.name]["Comment"] = field.doc + + if current_schema := metadata.schema_by_id(metadata.current_schema_id): + for field in current_schema.columns: + _append_to_results(field, True) + + for schema in metadata.schemas: + if schema.schema_id == metadata.current_schema_id: + continue + for field in schema.columns: + _append_to_results(field, False) + + return list(results.values()) + + def _construct_table_input( table_name: str, metadata_location: str, properties: Properties, + metadata: TableMetadataCommonFields, + location: Optional[str] = None, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None, ) -> TableInputTypeDef: @@ -95,8 +203,12 @@ def _construct_table_input( "Name": table_name, "TableType": EXTERNAL_TABLE, "Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location), + "StorageDescriptor": {"Columns": _to_columns(metadata)}, } + if location: + table_input["StorageDescriptor"]["Location"] = location + if "Description" in properties: table_input["Description"] = properties["Description"] @@ -258,7 +370,7 @@ def create_table( io = load_file_io(properties=self.properties, location=metadata_location) self._write_metadata(metadata, io, metadata_location) - table_input = _construct_table_input(table_name, metadata_location, properties) + table_input = _construct_table_input(table_name, metadata_location, properties, metadata, location) database_name, table_name = self.identifier_to_database_and_table(identifier) self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) @@ -322,6 +434,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons table_name=table_name, metadata_location=new_metadata_location, properties=current_table.properties, + metadata=updated_metadata, glue_table=current_glue_table, prev_metadata_location=current_table.metadata_location, ) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index bf6d11784f..99e2d42e93 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -38,7 +38,12 @@ @mock_glue def test_create_table_with_database_location( - _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str + _glue: boto3.client, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, ) -> None: catalog_name = "glue" identifier = (database_name, table_name) @@ -49,6 +54,22 @@ def test_create_table_with_database_location( assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + # Ensure schema is also pushed to Glue + table_info = _glue.get_table( + DatabaseName=database_name, + Name=table_name, + ) + storage_descriptor = table_info["Table"]["StorageDescriptor"] + columns = storage_descriptor["Columns"] + assert len(columns) == len(table_schema_nested.fields) + assert columns[0] == { + "Name": "foo", + "Type": "string", + "Parameters": {"iceberg.field.id": "1", "iceberg.field.optional": "true", "iceberg.field.current": "true"}, + } + + assert storage_descriptor["Location"] == f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}" + @mock_glue def test_create_table_with_default_warehouse( @@ -524,7 +545,12 @@ def test_passing_profile_name() -> None: @mock_glue def test_commit_table_update_schema( - _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str + _glue: boto3.client, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, ) -> None: catalog_name = "glue" identifier = (database_name, table_name) @@ -554,6 +580,19 @@ def test_commit_table_update_schema( assert new_schema == update._apply() assert new_schema.find_field("b").field_type == IntegerType() + # Ensure schema is also pushed to Glue + table_info = _glue.get_table( + DatabaseName=database_name, + Name=table_name, + ) + columns = table_info["Table"]["StorageDescriptor"]["Columns"] + assert len(columns) == len(table_schema_nested.fields) + 1 + assert columns[-1] == { + "Name": "b", + "Type": "int", + "Parameters": {"iceberg.field.id": "18", "iceberg.field.optional": "true", "iceberg.field.current": "true"}, + } + @mock_glue def test_commit_table_properties( From 05792086d503a0ab5aa96d4ab858f2b9b04c4c8e Mon Sep 17 00:00:00 2001 From: Michael Marino Date: Mon, 22 Jan 2024 08:54:02 +0100 Subject: [PATCH 2/7] Always include Location from metadata We can drop the additional argument because it is always there in the metadata. --- pyiceberg/catalog/glue.py | 11 +++++------ tests/catalog/test_glue.py | 4 +++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index df13a7adf5..59ddd975b5 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -195,7 +195,6 @@ def _construct_table_input( metadata_location: str, properties: Properties, metadata: TableMetadataCommonFields, - location: Optional[str] = None, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None, ) -> TableInputTypeDef: @@ -203,12 +202,12 @@ def _construct_table_input( "Name": table_name, "TableType": EXTERNAL_TABLE, "Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location), - "StorageDescriptor": {"Columns": _to_columns(metadata)}, + "StorageDescriptor": { + "Columns": _to_columns(metadata), + "Location": metadata.location, + }, } - if location: - table_input["StorageDescriptor"]["Location"] = location - if "Description" in properties: table_input["Description"] = properties["Description"] @@ -370,7 +369,7 @@ def create_table( io = load_file_io(properties=self.properties, location=metadata_location) self._write_metadata(metadata, io, metadata_location) - table_input = _construct_table_input(table_name, metadata_location, properties, metadata, location) + table_input = _construct_table_input(table_name, metadata_location, properties, metadata) database_name, table_name = self.identifier_to_database_and_table(identifier) self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 99e2d42e93..b1f1371a04 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -585,13 +585,15 @@ def test_commit_table_update_schema( DatabaseName=database_name, Name=table_name, ) - columns = table_info["Table"]["StorageDescriptor"]["Columns"] + storage_descriptor = table_info["Table"]["StorageDescriptor"] + columns = storage_descriptor["Columns"] assert len(columns) == len(table_schema_nested.fields) + 1 assert columns[-1] == { "Name": "b", "Type": "int", "Parameters": {"iceberg.field.id": "18", "iceberg.field.optional": "true", "iceberg.field.current": "true"}, } + assert storage_descriptor["Location"] == f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}" @mock_glue From 5e37da99fdc8bc077b8d00553985f4be044820e6 Mon Sep 17 00:00:00 2001 From: Michael Marino Date: Mon, 22 Jan 2024 08:55:08 +0100 Subject: [PATCH 3/7] Change typing to `TableMetadata` --- pyiceberg/catalog/glue.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 59ddd975b5..3fdb2829bf 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -64,7 +64,7 @@ from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata -from pyiceberg.table.metadata import TableMetadataCommonFields, new_table_metadata +from pyiceberg.table.metadata import TableMetadata, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT from pyiceberg.types import ( @@ -155,7 +155,7 @@ def _type_to_glue_type_string(input_type: IcebergType) -> str: raise ValueError(f"Unknown Type {input_type}") -def _to_columns(metadata: TableMetadataCommonFields) -> List[ColumnTypeDef]: +def _to_columns(metadata: TableMetadata) -> List[ColumnTypeDef]: results: Dict[str, ColumnTypeDef] = {} def _append_to_results(field: NestedField, is_current: bool) -> None: @@ -194,7 +194,7 @@ def _construct_table_input( table_name: str, metadata_location: str, properties: Properties, - metadata: TableMetadataCommonFields, + metadata: TableMetadata, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None, ) -> TableInputTypeDef: From defdac48b5ff236e2464a7f4666feb3d07e66a88 Mon Sep 17 00:00:00 2001 From: Michael Marino Date: Mon, 22 Jan 2024 09:06:39 +0100 Subject: [PATCH 4/7] Use SchemaVisitor to traverse Glue types/schema --- pyiceberg/catalog/glue.py | 87 ++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 46 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 3fdb2829bf..386959282a 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -61,7 +61,7 @@ ) from pyiceberg.io import load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec -from pyiceberg.schema import Schema +from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata from pyiceberg.table.metadata import TableMetadata, new_table_metadata @@ -75,12 +75,12 @@ DoubleType, FixedType, FloatType, - IcebergType, IntegerType, ListType, LongType, MapType, NestedField, + PrimitiveType, StringType, StructType, TimestampType, @@ -110,49 +110,44 @@ def _construct_parameters( return new_parameters -def _type_to_glue_type_string(input_type: IcebergType) -> str: - if isinstance(input_type, BooleanType): - return "boolean" - if isinstance(input_type, IntegerType): - return "int" - if isinstance(input_type, LongType): - return "bigint" - if isinstance(input_type, FloatType): - return "float" - if isinstance(input_type, DoubleType): - return "double" - if isinstance(input_type, DateType): - return "date" - if isinstance( - input_type, - ( - TimeType, - StringType, - UUIDType, - ), - ): - return "string" - if isinstance(input_type, TimestampType): - return "timestamp" - if isinstance( - input_type, - ( - FixedType, - BinaryType, - ), - ): - return "binary" - if isinstance(input_type, DecimalType): - return f"decimal({input_type.precision},{input_type.scale})" - if isinstance(input_type, StructType): - name_to_type = ",".join(f"{f.name}:{_type_to_glue_type_string(f.field_type)}" for f in input_type.fields) - return f"struct<{name_to_type}>" - if isinstance(input_type, ListType): - return f"array<{_type_to_glue_type_string(input_type.element_type)}>" - if isinstance(input_type, MapType): - return f"map<{_type_to_glue_type_string(input_type.key_type)},{_type_to_glue_type_string(input_type.value_type)}>" - - raise ValueError(f"Unknown Type {input_type}") +GLUE_PRIMITIVE_TYPES = { + BooleanType: "boolean", + IntegerType: "int", + LongType: "bigint", + FloatType: "float", + DoubleType: "double", + DateType: "date", + TimeType: "string", + StringType: "string", + UUIDType: "string", + TimestampType: "timestamp", + FixedType: "binary", + BinaryType: "binary", +} + + +class SchemaToGlueType(SchemaVisitor[str]): + def schema(self, schema: Schema, struct_result: str) -> str: + return struct_result + + def struct(self, struct: StructType, field_results: List[str]) -> str: + return f"struct<{','.join(field_results)}>" + + def field(self, field: NestedField, field_result: str) -> str: + return f"{field.name}:{field_result}" + + def list(self, list_type: ListType, element_result: str) -> str: + return f"array<{element_result}>" + + def map(self, map_type: MapType, key_result: str, value_result: str) -> str: + return f"map<{key_result},{value_result}>" + + def primitive(self, primitive: PrimitiveType) -> str: + if isinstance(primitive, DecimalType): + return f"decimal({primitive.precision},{primitive.scale})" + if (primitive_type := type(primitive)) not in GLUE_PRIMITIVE_TYPES: + raise ValueError(f"Unknown primitive type: {primitive}") + return GLUE_PRIMITIVE_TYPES[primitive_type] def _to_columns(metadata: TableMetadata) -> List[ColumnTypeDef]: @@ -166,7 +161,7 @@ def _append_to_results(field: NestedField, is_current: bool) -> None: ColumnTypeDef, { "Name": field.name, - "Type": _type_to_glue_type_string(field.field_type), + "Type": visit(field.field_type, SchemaToGlueType()), "Parameters": { ICEBERG_FIELD_ID: str(field.field_id), ICEBERG_FIELD_OPTIONAL: str(field.optional).lower(), From 40ab6e617c96712d5020e06b741fdbbd1963e75a Mon Sep 17 00:00:00 2001 From: Michael Marino Date: Mon, 22 Jan 2024 11:33:47 +0100 Subject: [PATCH 5/7] Add integration tests for glue/Athena This covers a few use cases, including updating the schema and reading reading real data back from Athena. --- tests/catalog/integration_test_glue.py | 162 ++++++++++++++++++++++++- 1 file changed, 159 insertions(+), 3 deletions(-) diff --git a/tests/catalog/integration_test_glue.py b/tests/catalog/integration_test_glue.py index 24401cae39..a56e4c6aaa 100644 --- a/tests/catalog/integration_test_glue.py +++ b/tests/catalog/integration_test_glue.py @@ -15,9 +15,12 @@ # specific language governing permissions and limitations # under the License. -from typing import Generator, List +import time +from typing import Any, Dict, Generator, List +from uuid import uuid4 import boto3 +import pyarrow as pa import pytest from botocore.exceptions import ClientError @@ -30,6 +33,7 @@ NoSuchTableError, TableAlreadyExistsError, ) +from pyiceberg.io.pyarrow import schema_to_pyarrow from pyiceberg.schema import Schema from pyiceberg.types import IntegerType from tests.conftest import clean_up, get_bucket_name, get_s3_path @@ -52,8 +56,62 @@ def fixture_test_catalog() -> Generator[Catalog, None, None]: clean_up(test_catalog) +class AthenaQueryHelper: + _athena_client: boto3.client + _s3_resource: boto3.resource + _output_bucket: str + _output_path: str + + def __init__(self) -> None: + self._s3_resource = boto3.resource("s3") + self._athena_client = boto3.client("athena") + self._output_bucket = get_bucket_name() + self._output_path = f"athena_results_{uuid4()}" + + def get_query_results(self, query: str) -> List[Dict[str, Any]]: + query_execution_id = self._athena_client.start_query_execution( + QueryString=query, ResultConfiguration={"OutputLocation": f"s3://{self._output_bucket}/{self._output_path}"} + )["QueryExecutionId"] + + while True: + result = self._athena_client.get_query_execution(QueryExecutionId=query_execution_id)["QueryExecution"]["Status"] + query_status = result["State"] + assert query_status not in [ + "FAILED", + "CANCELLED", + ], f""" + Athena query with the string failed or was cancelled: + Query: {query} + Status: {query_status} + Reason: {result["StateChangeReason"]}""" + + if query_status not in ["QUEUED", "RUNNING"]: + break + time.sleep(0.5) + + # No pagination for now, assume that we are not doing large queries + return self._athena_client.get_query_results(QueryExecutionId=query_execution_id)["ResultSet"]["Rows"] + + def clean_up(self) -> None: + bucket = self._s3_resource.Bucket(self._output_bucket) + for obj in bucket.objects.filter(Prefix=f"{self._output_path}/"): + self._s3_resource.Object(bucket.name, obj.key).delete() + + +@pytest.fixture(name="athena", scope="module") +def fixture_athena_helper() -> Generator[AthenaQueryHelper, None, None]: + query_helper = AthenaQueryHelper() + yield query_helper + query_helper.clean_up() + + def test_create_table( - test_catalog: Catalog, s3: boto3.client, table_schema_nested: Schema, table_name: str, database_name: str + test_catalog: Catalog, + s3: boto3.client, + table_schema_nested: Schema, + table_name: str, + database_name: str, + athena: AthenaQueryHelper, ) -> None: identifier = (database_name, table_name) test_catalog.create_namespace(database_name) @@ -64,6 +122,48 @@ def test_create_table( s3.head_object(Bucket=get_bucket_name(), Key=metadata_location) assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + table.append( + pa.Table.from_pylist( + [ + { + "foo": "foo_val", + "bar": 1, + "baz": False, + "qux": ["x", "y"], + "quux": {"key": {"subkey": 2}}, + "location": [{"latitude": 1.1}], + "person": {"name": "some_name", "age": 23}, + } + ], + schema=schema_to_pyarrow(table.schema()), + ), + ) + + assert athena.get_query_results(f'SELECT * FROM "{database_name}"."{table_name}"') == [ + { + "Data": [ + {"VarCharValue": "foo"}, + {"VarCharValue": "bar"}, + {"VarCharValue": "baz"}, + {"VarCharValue": "qux"}, + {"VarCharValue": "quux"}, + {"VarCharValue": "location"}, + {"VarCharValue": "person"}, + ] + }, + { + "Data": [ + {"VarCharValue": "foo_val"}, + {"VarCharValue": "1"}, + {"VarCharValue": "false"}, + {"VarCharValue": "[x, y]"}, + {"VarCharValue": "{key={subkey=2}}"}, + {"VarCharValue": "[{latitude=1.1, longitude=null}]"}, + {"VarCharValue": "{name=some_name, age=23}"}, + ] + }, + ] + def test_create_table_with_invalid_location(table_schema_nested: Schema, table_name: str, database_name: str) -> None: identifier = (database_name, table_name) @@ -269,7 +369,7 @@ def test_update_namespace_properties(test_catalog: Catalog, database_name: str) def test_commit_table_update_schema( - test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str + test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str, athena: AthenaQueryHelper ) -> None: identifier = (database_name, table_name) test_catalog.create_namespace(namespace=database_name) @@ -279,6 +379,20 @@ def test_commit_table_update_schema( assert test_catalog._parse_metadata_version(table.metadata_location) == 0 assert original_table_metadata.current_schema_id == 0 + assert athena.get_query_results(f'SELECT * FROM "{database_name}"."{table_name}"') == [ + { + "Data": [ + {"VarCharValue": "foo"}, + {"VarCharValue": "bar"}, + {"VarCharValue": "baz"}, + {"VarCharValue": "qux"}, + {"VarCharValue": "quux"}, + {"VarCharValue": "location"}, + {"VarCharValue": "person"}, + ] + } + ] + transaction = table.transaction() update = transaction.update_schema() update.add_column(path="b", field_type=IntegerType()) @@ -295,6 +409,48 @@ def test_commit_table_update_schema( assert new_schema == update._apply() assert new_schema.find_field("b").field_type == IntegerType() + table.append( + pa.Table.from_pylist( + [ + { + "foo": "foo_val", + "bar": 1, + "location": [{"latitude": 1.1}], + "person": {"name": "some_name", "age": 23}, + "b": 2, + } + ], + schema=schema_to_pyarrow(new_schema), + ), + ) + + assert athena.get_query_results(f'SELECT * FROM "{database_name}"."{table_name}"') == [ + { + "Data": [ + {"VarCharValue": "foo"}, + {"VarCharValue": "bar"}, + {"VarCharValue": "baz"}, + {"VarCharValue": "qux"}, + {"VarCharValue": "quux"}, + {"VarCharValue": "location"}, + {"VarCharValue": "person"}, + {"VarCharValue": "b"}, + ] + }, + { + "Data": [ + {"VarCharValue": "foo_val"}, + {"VarCharValue": "1"}, + {}, + {"VarCharValue": "[]"}, + {"VarCharValue": "{}"}, + {"VarCharValue": "[{latitude=1.1, longitude=null}]"}, + {"VarCharValue": "{name=some_name, age=23}"}, + {"VarCharValue": "2"}, + ] + }, + ] + def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None: identifier = (database_name, table_name) From 2877c69269521233f7ea6350b78bf222bff39ec7 Mon Sep 17 00:00:00 2001 From: Michael Marino Date: Tue, 23 Jan 2024 07:50:09 +0100 Subject: [PATCH 6/7] Rename schema visitor to indicate direction of conversion --- pyiceberg/catalog/glue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 386959282a..cf29ef74ef 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -126,7 +126,7 @@ def _construct_parameters( } -class SchemaToGlueType(SchemaVisitor[str]): +class IcebergSchemaToGlueType(SchemaVisitor[str]): def schema(self, schema: Schema, struct_result: str) -> str: return struct_result @@ -161,7 +161,7 @@ def _append_to_results(field: NestedField, is_current: bool) -> None: ColumnTypeDef, { "Name": field.name, - "Type": visit(field.field_type, SchemaToGlueType()), + "Type": visit(field.field_type, IcebergSchemaToGlueType()), "Parameters": { ICEBERG_FIELD_ID: str(field.field_id), ICEBERG_FIELD_OPTIONAL: str(field.optional).lower(), From b3badb4ceff631661f13706dce60dc8aaa4734e4 Mon Sep 17 00:00:00 2001 From: Michael Marino Date: Wed, 24 Jan 2024 08:02:46 +0100 Subject: [PATCH 7/7] Make Schema Visitor internal --- pyiceberg/catalog/glue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index cf29ef74ef..bccbfa4f0a 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -126,7 +126,7 @@ def _construct_parameters( } -class IcebergSchemaToGlueType(SchemaVisitor[str]): +class _IcebergSchemaToGlueType(SchemaVisitor[str]): def schema(self, schema: Schema, struct_result: str) -> str: return struct_result @@ -161,7 +161,7 @@ def _append_to_results(field: NestedField, is_current: bool) -> None: ColumnTypeDef, { "Name": field.name, - "Type": visit(field.field_type, IcebergSchemaToGlueType()), + "Type": visit(field.field_type, _IcebergSchemaToGlueType()), "Parameters": { ICEBERG_FIELD_ID: str(field.field_id), ICEBERG_FIELD_OPTIONAL: str(field.optional).lower(),