diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index e2dbb51459..511644b7f9 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -295,8 +295,8 @@ def _get_source_queries( ) for c in target_columns_to_types ] - query_factory = ( - lambda: exp.Select() + query_factory = lambda: ( + exp.Select() .select(*select_columns) .from_(query_or_df.subquery("select_source_columns")) ) @@ -1077,10 +1077,11 @@ def clone_table( raise NotImplementedError(f"Engine does not support cloning: {type(self)}") kwargs.pop("rendered_physical_properties", None) + table_kind = kwargs.pop("table_kind", "TABLE") self.execute( exp.Create( this=exp.to_table(target_table_name), - kind="TABLE", + kind=table_kind, replace=replace, exists=exists, clone=exp.Clone( diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index a8eabe070d..add68b4550 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -42,6 +42,7 @@ SnowparkSession, ) from sqlmesh.core.node import IntervalUnit + from sqlmesh.core.schema_diff import TableAlterOperation @set_catalog( @@ -675,6 +676,12 @@ def clone_table( if isinstance(table_type, exp.TransientProperty): kwargs["properties"] = exp.Properties(expressions=[table_type]) + # Snowflake requires CREATE ICEBERG TABLE ... CLONE for Iceberg tables + # instead of the regular CREATE TABLE ... CLONE + table_format = kwargs.pop("table_format", None) + if table_format and isinstance(table_format, str) and table_format.upper() == "ICEBERG": + kwargs["table_kind"] = f"{table_format.upper()} TABLE" + super().clone_table( target_table_name, source_table_name, @@ -683,6 +690,31 @@ def clone_table( **kwargs, ) + def alter_table( # type: ignore[override] + self, + alter_expressions: t.Union[t.List[exp.Alter], t.List["TableAlterOperation"]], + **kwargs: t.Any, + ) -> None: + # Snowflake requires ALTER ICEBERG TABLE instead of ALTER TABLE for Iceberg tables + table_format = kwargs.pop("table_format", None) + if table_format and isinstance(table_format, str) and table_format.upper() == "ICEBERG": + from sqlmesh.core.schema_diff import TableAlterOperation + + resolved_expressions = [] + for x in alter_expressions: + if isinstance(x, TableAlterOperation): + alter_expr = x.expression + else: + alter_expr = x + alter_expr.args["kind"] = f"{table_format.upper()} TABLE" + resolved_expressions.append(alter_expr) + + with self.transaction(): + for alter_expr in resolved_expressions: + self.execute(alter_expr) + else: + super().alter_table(alter_expressions) + @t.overload def _columns_to_types( self, diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 1808011854..7e7901f4d9 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1081,10 +1081,14 @@ def _clone_snapshot_in_dev( try: logger.info(f"Cloning table '{source_table_name}' into '{target_table_name}'") + clone_kwargs: t.Dict[str, t.Any] = {} + if snapshot.model.table_format: + clone_kwargs["table_format"] = snapshot.model.table_format adapter.clone_table( target_table_name, snapshot.table_name(), rendered_physical_properties=rendered_physical_properties, + **clone_kwargs, ) self._migrate_target_table( target_table_name=target_table_name, @@ -2108,7 +2112,10 @@ def migrate( _check_additive_schema_change( snapshot, alter_operations, kwargs["allow_additive_snapshots"] ) - self.adapter.alter_table(alter_operations) + alter_kwargs: t.Dict[str, t.Any] = {} + if snapshot.model.table_format: + alter_kwargs["table_format"] = snapshot.model.table_format + self.adapter.alter_table(alter_operations, **alter_kwargs) # Apply grants after schema migration deployability_index = kwargs.get("deployability_index")