From f92c298180573d13fc9acbd3d80b732aa4bdd73b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 6 Oct 2023 22:57:02 +0200 Subject: [PATCH 1/3] Fix the TableIdentifier --- pyiceberg/catalog/rest.py | 6 +++++- pyiceberg/table/__init__.py | 26 ++++++++++++++++++++++++-- tests/test_integration.py | 10 +++++----- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index 0023e18984..ad2c510d03 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -62,6 +62,7 @@ CommitTableRequest, CommitTableResponse, Table, + TableIdentifier, TableMetadata, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -301,7 +302,10 @@ def _fetch_config(self) -> None: # Update URI based on overrides self.uri = config[URI] - def _split_identifier_for_path(self, identifier: Union[str, Identifier]) -> Properties: + def _split_identifier_for_path(self, identifier: Union[str, Identifier, TableIdentifier]) -> Properties: + if isinstance(identifier, TableIdentifier): + return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name} + identifier_tuple = self.identifier_to_tuple(identifier) if len(identifier_tuple) <= 1: raise NoSuchTableError(f"Missing namespace or invalid identifier: {'.'.join(identifier_tuple)}") diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6e71a40c2d..ebdca60939 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -75,6 +75,7 @@ from pyiceberg.typedef import ( EMPTY_DICT, IcebergBaseModel, + IcebergRootModel, Identifier, KeyDefaultDict, Properties, @@ -403,8 +404,25 @@ class AssertDefaultSortOrderId(TableRequirement): default_sort_order_id: int = Field(..., alias="default-sort-order-id") +class Namespace(IcebergRootModel[List[str]]): + """Reference to one or more levels of a namespace.""" + + root: List[str] = Field( + ..., + description='Reference to one or more levels of a namespace', + example=['accounting', 'tax'], + ) + + +class TableIdentifier(IcebergBaseModel): + """Fully Qualified identifier to a table.""" + + namespace: Namespace + name: str + + class CommitTableRequest(IcebergBaseModel): - identifier: Identifier = Field() + identifier: TableIdentifier = Field() requirements: Tuple[SerializeAsAny[TableRequirement], ...] = Field(default_factory=tuple) updates: Tuple[SerializeAsAny[TableUpdate], ...] = Field(default_factory=tuple) @@ -535,7 +553,11 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None: response = self.catalog._commit_table( # pylint: disable=W0212 - CommitTableRequest(identifier=self.identifier[1:], updates=updates, requirements=requirements) + CommitTableRequest( + identifier=TableIdentifier(namespace=self.identifier[:-1], name=self.identifier[-1]), + updates=updates, + requirements=requirements, + ) ) # pylint: disable=W0212 self.metadata = response.metadata self.metadata_location = response.metadata_location diff --git a/tests/test_integration.py b/tests/test_integration.py index 297749b1b7..20831e8afc 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -104,25 +104,25 @@ def table(catalog: Catalog) -> Table: @pytest.mark.integration def test_table_properties(table: Table) -> None: - assert table.properties == {} + assert table.properties == {'write.parquet.compression-codec': 'zstd'} with table.transaction() as transaction: transaction.set_properties(abc="🤪") - assert table.properties == {"abc": "🤪"} + assert table.properties == {"abc": "🤪", 'write.parquet.compression-codec': 'zstd'} with table.transaction() as transaction: transaction.remove_properties("abc") - assert table.properties == {} + assert table.properties == {'write.parquet.compression-codec': 'zstd'} table = table.transaction().set_properties(abc="def").commit_transaction() - assert table.properties == {"abc": "def"} + assert table.properties == {"abc": "def", 'write.parquet.compression-codec': 'zstd'} table = table.transaction().remove_properties("abc").commit_transaction() - assert table.properties == {} + assert table.properties == {'write.parquet.compression-codec': 'zstd'} @pytest.fixture() From 0f50727217fb2280e7ecdabedb1ce9a48aa46882 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 6 Oct 2023 23:27:36 +0200 Subject: [PATCH 2/3] Make the rest of the tests happy --- tests/catalog/test_base.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index da121f6114..1078dd1b0a 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -46,8 +46,10 @@ AddSchemaUpdate, CommitTableRequest, CommitTableResponse, + Namespace, SetCurrentSchemaUpdate, Table, + TableIdentifier, ) from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -119,8 +121,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons for update in table_request.updates: if isinstance(update, AddSchemaUpdate): add_schema_update: AddSchemaUpdate = update - identifier = Catalog.identifier_to_tuple(table_request.identifier) - table = self.__tables[("com", *identifier)] + identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,) + table = self.__tables[identifier] new_metadata = new_table_metadata( add_schema_update.schema_, table.metadata.partition_specs[0], @@ -528,7 +530,7 @@ def test_commit_table(catalog: InMemoryCatalog) -> None: # When response = given_table.catalog._commit_table( # pylint: disable=W0212 CommitTableRequest( - identifier=given_table.identifier[1:], + identifier=TableIdentifier(namespace=Namespace(given_table.identifier[:-1]), name=given_table.identifier[-1]), updates=[ AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id), SetCurrentSchemaUpdate(schema_id=-1), From f22e6faa35a6b44b3c2580ae092aa8f5b9a1a10e Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sat, 7 Oct 2023 09:01:06 +0200 Subject: [PATCH 3/3] Add default properties --- tests/test_integration.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/test_integration.py b/tests/test_integration.py index 20831e8afc..6d777cf99b 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -46,6 +46,8 @@ TimestampType, ) +DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'} + @pytest.fixture() def catalog() -> Catalog: @@ -104,25 +106,25 @@ def table(catalog: Catalog) -> Table: @pytest.mark.integration def test_table_properties(table: Table) -> None: - assert table.properties == {'write.parquet.compression-codec': 'zstd'} + assert table.properties == DEFAULT_PROPERTIES with table.transaction() as transaction: transaction.set_properties(abc="🤪") - assert table.properties == {"abc": "🤪", 'write.parquet.compression-codec': 'zstd'} + assert table.properties == dict(**{"abc": "🤪"}, **DEFAULT_PROPERTIES) with table.transaction() as transaction: transaction.remove_properties("abc") - assert table.properties == {'write.parquet.compression-codec': 'zstd'} + assert table.properties == DEFAULT_PROPERTIES table = table.transaction().set_properties(abc="def").commit_transaction() - assert table.properties == {"abc": "def", 'write.parquet.compression-codec': 'zstd'} + assert table.properties == dict(**{"abc": "def"}, **DEFAULT_PROPERTIES) table = table.transaction().remove_properties("abc").commit_transaction() - assert table.properties == {'write.parquet.compression-codec': 'zstd'} + assert table.properties == DEFAULT_PROPERTIES @pytest.fixture()