diff --git a/dev/provision.py b/dev/provision.py index 56e3459edd..b75030f8a3 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -279,3 +279,25 @@ (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 12, 'l'); """ ) + +# There is an issue with CREATE OR REPLACE +# https://github.com/apache/iceberg/issues/8756 +spark.sql( + """ +DROP TABLE IF EXISTS default.test_table_version +""" +) + +spark.sql( + """ +CREATE TABLE default.test_table_version ( + dt date, + number integer, + letter string +) +USING iceberg +TBLPROPERTIES ( + 'format-version'='1' +); +""" +) diff --git a/dev/spark-defaults.conf b/dev/spark-defaults.conf index 28f93b15a6..56c345432a 100644 --- a/dev/spark-defaults.conf +++ b/dev/spark-defaults.conf @@ -20,7 +20,7 @@ spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.demo.type rest spark.sql.catalog.demo.uri http://rest:8181 spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO -spark.sql.catalog.demo.warehouse s3a://warehouse/wh/ +spark.sql.catalog.demo.warehouse s3://warehouse/wh/ spark.sql.catalog.demo.s3.endpoint http://minio:9000 spark.sql.defaultCatalog demo spark.eventLog.enabled true diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ebdca60939..c171b39c26 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -166,7 +166,7 @@ def _append_requirements(self, *new_requirements: TableRequirement) -> Transacti self._requirements = self._requirements + new_requirements return self - def set_table_version(self, format_version: Literal[1, 2]) -> Transaction: + def upgrade_table_version(self, format_version: Literal[1, 2]) -> Transaction: """Set the table to a certain version. Args: @@ -175,7 +175,15 @@ def set_table_version(self, format_version: Literal[1, 2]) -> Transaction: Returns: The alter table builder. """ - raise NotImplementedError("Not yet implemented") + if format_version not in {1, 2}: + raise ValueError(f"Unsupported table format version: {format_version}") + + if format_version < self._table.metadata.format_version: + raise ValueError(f"Cannot downgrade v{self._table.metadata.format_version} table to v{format_version}") + if format_version > self._table.metadata.format_version: + return self._append_updates(UpgradeFormatVersionUpdate(format_version=format_version)) + else: + return self def set_properties(self, **updates: str) -> Transaction: """Set properties. @@ -482,6 +490,10 @@ def scan( limit=limit, ) + @property + def format_version(self) -> Literal[1, 2]: + return self.metadata.format_version + def schema(self) -> Schema: """Return the schema for this table.""" return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id) diff --git a/tests/test_integration.py b/tests/test_integration.py index 6d777cf99b..6e874b68fa 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -83,6 +83,11 @@ def table_test_all_types(catalog: Catalog) -> Table: return catalog.load_table("default.test_all_types") +@pytest.fixture() +def table_test_table_version(catalog: Catalog) -> Table: + return catalog.load_table("default.test_table_version") + + TABLE_NAME = ("default", "t1") @@ -366,3 +371,28 @@ def test_scan_tag(test_positional_mor_deletes: Table) -> None: def test_scan_branch(test_positional_mor_deletes: Table) -> None: arrow_table = test_positional_mor_deletes.scan().use_ref("without_5").to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12] + + +@pytest.mark.integration +def test_upgrade_table_version(table_test_table_version: Table) -> None: + assert table_test_table_version.format_version == 1 + + with table_test_table_version.transaction() as transaction: + transaction.upgrade_table_version(format_version=1) + + assert table_test_table_version.format_version == 1 + + with table_test_table_version.transaction() as transaction: + transaction.upgrade_table_version(format_version=2) + + assert table_test_table_version.format_version == 2 + + with pytest.raises(ValueError) as e: # type: ignore + with table_test_table_version.transaction() as transaction: + transaction.upgrade_table_version(format_version=1) + assert "Cannot downgrade v2 table to v1" in str(e.value) + + with pytest.raises(ValueError) as e: + with table_test_table_version.transaction() as transaction: + transaction.upgrade_table_version(format_version=3) + assert "Unsupported table format version: 3" in str(e.value)