Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,25 @@
(CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 12, 'l');
"""
)

# There is an issue with CREATE OR REPLACE
# https://github.com/apache/iceberg/issues/8756
spark.sql(
"""
DROP TABLE IF EXISTS default.test_table_version
"""
)

spark.sql(
"""
CREATE TABLE default.test_table_version (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'format-version'='1'
);
"""
)
2 changes: 1 addition & 1 deletion dev/spark-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo.type rest
spark.sql.catalog.demo.uri http://rest:8181
spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.demo.warehouse s3a://warehouse/wh/
spark.sql.catalog.demo.warehouse s3://warehouse/wh/
spark.sql.catalog.demo.s3.endpoint http://minio:9000
spark.sql.defaultCatalog demo
spark.eventLog.enabled true
Expand Down
16 changes: 14 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def _append_requirements(self, *new_requirements: TableRequirement) -> Transacti
self._requirements = self._requirements + new_requirements
return self

def set_table_version(self, format_version: Literal[1, 2]) -> Transaction:
def upgrade_table_version(self, format_version: Literal[1, 2]) -> Transaction:
"""Set the table to a certain version.

Args:
Expand All @@ -175,7 +175,15 @@ def set_table_version(self, format_version: Literal[1, 2]) -> Transaction:
Returns:
The alter table builder.
"""
raise NotImplementedError("Not yet implemented")
if format_version not in {1, 2}:
raise ValueError(f"Unsupported table format version: {format_version}")

if format_version < self._table.metadata.format_version:
raise ValueError(f"Cannot downgrade v{self._table.metadata.format_version} table to v{format_version}")
if format_version > self._table.metadata.format_version:
return self._append_updates(UpgradeFormatVersionUpdate(format_version=format_version))
else:
return self

def set_properties(self, **updates: str) -> Transaction:
"""Set properties.
Expand Down Expand Up @@ -482,6 +490,10 @@ def scan(
limit=limit,
)

@property
def format_version(self) -> Literal[1, 2]:
return self.metadata.format_version

def schema(self) -> Schema:
"""Return the schema for this table."""
return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)
Expand Down
30 changes: 30 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ def table_test_all_types(catalog: Catalog) -> Table:
return catalog.load_table("default.test_all_types")


@pytest.fixture()
def table_test_table_version(catalog: Catalog) -> Table:
return catalog.load_table("default.test_table_version")


TABLE_NAME = ("default", "t1")


Expand Down Expand Up @@ -366,3 +371,28 @@ def test_scan_tag(test_positional_mor_deletes: Table) -> None:
def test_scan_branch(test_positional_mor_deletes: Table) -> None:
arrow_table = test_positional_mor_deletes.scan().use_ref("without_5").to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12]


@pytest.mark.integration
def test_upgrade_table_version(table_test_table_version: Table) -> None:
assert table_test_table_version.format_version == 1

with table_test_table_version.transaction() as transaction:
transaction.upgrade_table_version(format_version=1)

assert table_test_table_version.format_version == 1

with table_test_table_version.transaction() as transaction:
transaction.upgrade_table_version(format_version=2)

assert table_test_table_version.format_version == 2

with pytest.raises(ValueError) as e: # type: ignore
with table_test_table_version.transaction() as transaction:
transaction.upgrade_table_version(format_version=1)
assert "Cannot downgrade v2 table to v1" in str(e.value)

with pytest.raises(ValueError) as e:
with table_test_table_version.transaction() as transaction:
transaction.upgrade_table_version(format_version=3)
assert "Unsupported table format version: 3" in str(e.value)