diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index e18a0598b9..4bcbf91ab4 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -27,6 +27,7 @@ delete, insert, select, + text, union, update, ) @@ -92,6 +93,7 @@ class IcebergTables(SqlCatalogBaseTable): table_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True) metadata_location: Mapped[str | None] = mapped_column(String(1000), nullable=True) previous_metadata_location: Mapped[str | None] = mapped_column(String(1000), nullable=True) + iceberg_type: Mapped[str | None] = mapped_column(String(5), nullable=True, default="TABLE") class IcebergNamespaceProperties(SqlCatalogBaseTable): @@ -133,6 +135,8 @@ def __init__(self, name: str, **properties: str): if init_catalog_tables: self._ensure_tables_exist() + self._update_tables_if_required() + def _ensure_tables_exist(self) -> None: with Session(self.engine) as session: for table in [IcebergTables, IcebergNamespaceProperties]: @@ -146,6 +150,15 @@ def _ensure_tables_exist(self) -> None: self.create_tables() return + def _update_tables_if_required(self) -> None: + with Session(self.engine) as session: + stmt = f"ALTER TABLE {IcebergTables.__tablename__} ADD COLUMN iceberg_type VARCHAR(5)" + try: + session.execute(text(stmt)) + session.commit() + except (OperationalError, ProgrammingError): + session.rollback() + def create_tables(self) -> None: SqlCatalogBaseTable.metadata.create_all(self.engine) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index f6846195fe..59ae00f3b1 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -20,7 +20,7 @@ from typing import cast import pytest -from sqlalchemy import Engine, create_engine, inspect +from sqlalchemy import Engine, create_engine, inspect, text from sqlalchemy.exc import ArgumentError from pyiceberg.catalog import load_catalog @@ -261,3 +261,83 @@ def test_sql_catalog_multiple_close_calls(self, catalog_sqlite: SqlCatalog) -> N # Second close should not raise an exception catalog_sqlite.close() + + +def _create_pre_migration_schema_tables(engine: Engine) -> None: + with engine.connect() as conn: + conn.execute( + text( + "CREATE TABLE iceberg_tables (" + " catalog_name VARCHAR(255) NOT NULL," + " table_namespace VARCHAR(255) NOT NULL," + " table_name VARCHAR(255) NOT NULL," + " metadata_location VARCHAR(1000)," + " previous_metadata_location VARCHAR(1000)," + " PRIMARY KEY (catalog_name, table_namespace, table_name)" + ")" + ) + ) + conn.execute( + text( + "CREATE TABLE iceberg_namespace_properties (" + " catalog_name VARCHAR(255) NOT NULL," + " namespace VARCHAR(255) NOT NULL," + " property_key VARCHAR(255) NOT NULL," + " property_value VARCHAR(1000) NOT NULL," + " PRIMARY KEY (catalog_name, namespace, property_key)" + ")" + ) + ) + conn.commit() + + +def get_columns(engine: Engine) -> set[str]: + return {c["name"] for c in inspect(engine).get_columns("iceberg_tables")} + + +def test_adds_iceberg_type_column_to_old_schema(warehouse: Path) -> None: + # Create the old schema tables + uri = f"sqlite:////{warehouse}/test-migration-add-col" + engine = create_engine(uri) + with engine.connect() as conn: + conn.execute( + text( + "CREATE TABLE iceberg_tables (" + " catalog_name VARCHAR(255) NOT NULL," + " table_namespace VARCHAR(255) NOT NULL," + " table_name VARCHAR(255) NOT NULL," + " metadata_location VARCHAR(1000)," + " previous_metadata_location VARCHAR(1000)," + " PRIMARY KEY (catalog_name, table_namespace, table_name)" + ")" + ) + ) + conn.execute( + text( + "CREATE TABLE iceberg_namespace_properties (" + " catalog_name VARCHAR(255) NOT NULL," + " namespace VARCHAR(255) NOT NULL," + " property_key VARCHAR(255) NOT NULL," + " property_value VARCHAR(1000) NOT NULL," + " PRIMARY KEY (catalog_name, namespace, property_key)" + ")" + ) + ) + conn.commit() + + # Verify the column does not exist in the old schema + assert "iceberg_type" not in get_columns(engine) + + # Load the catalog and verify the column exists + catalog = SqlCatalog("test", uri=uri, warehouse=f"file://{warehouse}", init_catalog_tables="false") + assert "iceberg_type" in get_columns(catalog.engine) + + +def test_idempotent_when_column_already_exists(warehouse: Path) -> None: + # Verify the column was created by the init_tables call + catalog = SqlCatalog("test", uri="sqlite:///:memory:", warehouse=f"file://{warehouse}") + assert "iceberg_type" in get_columns(catalog.engine) + + # Verify the method is idempotent by calling it again + catalog._update_tables_if_required() + assert "iceberg_type" in get_columns(catalog.engine)