From 83dc91641b2e8fc7e0822a7694e56d81d1d1d49d Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sun, 3 Dec 2023 04:04:13 +0100 Subject: [PATCH 1/4] Fix downgrading of schema --- pyiceberg/table/__init__.py | 21 +++++++++++++++------ tests/test_integration_schema.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b0f2bcd1c4..720bdd044f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1412,13 +1412,22 @@ def commit(self) -> None: """Apply the pending changes and commit.""" new_schema = self._apply() - if new_schema != self._schema: - last_column_id = max(self._table.metadata.last_column_id, new_schema.highest_field_id) - updates = ( - AddSchemaUpdate(schema=new_schema, last_column_id=last_column_id), - SetCurrentSchemaUpdate(schema_id=-1), - ) + existing_schema_id: Optional[int] = None + for schema in self._table.metadata.schemas: + if new_schema.fields == schema.fields: + existing_schema_id = schema.schema_id + + # Check if it is different current schema ID + if existing_schema_id != self._table.schema().schema_id: requirements = (AssertCurrentSchemaId(current_schema_id=self._schema.schema_id),) + if existing_schema_id is None: + last_column_id = max(self._table.metadata.last_column_id, new_schema.highest_field_id) + updates = ( + AddSchemaUpdate(schema=new_schema, last_column_id=last_column_id), + SetCurrentSchemaUpdate(schema_id=-1), + ) + else: + updates = (SetCurrentSchemaUpdate(schema_id=existing_schema_id),) # type: ignore if self._transaction is not None: self._transaction._append_updates(*updates) # pylint: disable=W0212 diff --git a/tests/test_integration_schema.py b/tests/test_integration_schema.py index f0ccb1b0e8..d844e6d6c0 100644 --- a/tests/test_integration_schema.py +++ b/tests/test_integration_schema.py @@ -340,6 +340,34 @@ def test_no_changes_empty_commit(simple_table: Table, table_schema_simple: Schem assert simple_table.schema() == table_schema_simple +@pytest.mark.integration +def test_revert_changes(simple_table: Table, table_schema_simple: Schema) -> None: + with simple_table.update_schema() as update: + update.add_column(path="data", field_type=IntegerType(), required=False) + + with simple_table.update_schema(allow_incompatible_changes=True) as update: + update.delete_column(path="data") + + assert simple_table.schemas() == { + 0: Schema( + NestedField(field_id=1, name='foo', field_type=StringType(), required=False), + NestedField(field_id=2, name='bar', field_type=IntegerType(), required=True), + NestedField(field_id=3, name='baz', field_type=BooleanType(), required=False), + schema_id=0, + identifier_field_ids=[2], + ), + 1: Schema( + NestedField(field_id=1, name='foo', field_type=StringType(), required=False), + NestedField(field_id=2, name='bar', field_type=IntegerType(), required=True), + NestedField(field_id=3, name='baz', field_type=BooleanType(), required=False), + NestedField(field_id=4, name='data', field_type=IntegerType(), required=False), + schema_id=1, + identifier_field_ids=[2], + ), + } + assert simple_table.schema().schema_id == 0 + + @pytest.mark.integration def test_delete_field(simple_table: Table) -> None: with simple_table.update_schema() as schema_update: From a165ba727adde8d62a7424f12e1d2afa74112e6d Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sun, 3 Dec 2023 04:37:53 +0100 Subject: [PATCH 2/4] Fix comparison --- pyiceberg/table/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 720bdd044f..1beb800ce7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1414,7 +1414,7 @@ def commit(self) -> None: existing_schema_id: Optional[int] = None for schema in self._table.metadata.schemas: - if new_schema.fields == schema.fields: + if new_schema == schema: existing_schema_id = schema.schema_id # Check if it is different current schema ID From e854829ab061ea1eb646ba60575dc83b6d2bb79b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 4 Dec 2023 04:25:35 +0100 Subject: [PATCH 3/4] Add suggestion --- pyiceberg/table/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 1beb800ce7..f6b89370ed 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1412,10 +1412,9 @@ def commit(self) -> None: """Apply the pending changes and commit.""" new_schema = self._apply() - existing_schema_id: Optional[int] = None - for schema in self._table.metadata.schemas: - if new_schema == schema: - existing_schema_id = schema.schema_id + existing_schema_id = existing_schema_id = next( + (schema.schema_id for schema in self._table.metadata.schemas if schema == new_schema), None + ) # Check if it is different current schema ID if existing_schema_id != self._table.schema().schema_id: From ea28e966099264362cccee3e7bc7132c0c3a5996 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 4 Dec 2023 04:41:36 +0100 Subject: [PATCH 4/4] Remove double assignment --- pyiceberg/table/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f6b89370ed..6fbde32cc7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1412,9 +1412,7 @@ def commit(self) -> None: """Apply the pending changes and commit.""" new_schema = self._apply() - existing_schema_id = existing_schema_id = next( - (schema.schema_id for schema in self._table.metadata.schemas if schema == new_schema), None - ) + existing_schema_id = next((schema.schema_id for schema in self._table.metadata.schemas if schema == new_schema), None) # Check if it is different current schema ID if existing_schema_id != self._table.schema().schema_id: