From 55a7d1f5ae9c28c375a9a1f7ef04640d140e4ea5 Mon Sep 17 00:00:00 2001 From: Guillem Gimenez Date: Sat, 7 Mar 2026 21:56:56 +0100 Subject: [PATCH 1/3] feat: support cloning Iceberg tables on Snowflake by passing table format to the engine adapter. --- sqlmesh/core/engine_adapter/base.py | 3 ++- sqlmesh/core/engine_adapter/snowflake.py | 6 ++++++ sqlmesh/core/snapshot/evaluator.py | 1 + 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index e2dbb51459..87fe39dbbd 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1077,10 +1077,11 @@ def clone_table( raise NotImplementedError(f"Engine does not support cloning: {type(self)}") kwargs.pop("rendered_physical_properties", None) + table_kind = kwargs.pop("table_kind", "TABLE") self.execute( exp.Create( this=exp.to_table(target_table_name), - kind="TABLE", + kind=table_kind, replace=replace, exists=exists, clone=exp.Clone( diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index a8eabe070d..e875182be9 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -675,6 +675,12 @@ def clone_table( if isinstance(table_type, exp.TransientProperty): kwargs["properties"] = exp.Properties(expressions=[table_type]) + # Snowflake requires CREATE ICEBERG TABLE ... CLONE for Iceberg tables + # instead of the regular CREATE TABLE ... CLONE + table_format = kwargs.pop("table_format", None) + if table_format and isinstance(table_format, str) and table_format.upper() == "ICEBERG": + kwargs["table_kind"] = f"{table_format.upper()} TABLE" + super().clone_table( target_table_name, source_table_name, diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 1808011854..fc7f9956bf 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1085,6 +1085,7 @@ def _clone_snapshot_in_dev( target_table_name, snapshot.table_name(), rendered_physical_properties=rendered_physical_properties, + table_format=snapshot.model.table_format, ) self._migrate_target_table( target_table_name=target_table_name, From 2d5a831650d3b73ed7b1724c2dc52976508ef08a Mon Sep 17 00:00:00 2001 From: Guillem Gimenez Date: Sat, 7 Mar 2026 22:13:47 +0100 Subject: [PATCH 2/3] feat: Add support for `ALTER ICEBERG TABLE` in Snowflake by passing table format to the `alter_table` method. --- sqlmesh/core/engine_adapter/base.py | 1 + sqlmesh/core/engine_adapter/snowflake.py | 25 ++++++++++++++++++++++++ sqlmesh/core/snapshot/evaluator.py | 2 +- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 87fe39dbbd..7decd02063 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1184,6 +1184,7 @@ def get_alter_operations( def alter_table( self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], + **kwargs: t.Any, ) -> None: """ Performs the alter statements to change the current table into the structure of the target table. diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index e875182be9..a2733e8f49 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -689,6 +689,31 @@ def clone_table( **kwargs, ) + def alter_table( + self, + alter_expressions: t.Union[t.List[exp.Alter], t.List["TableAlterOperation"]], + **kwargs: t.Any, + ) -> None: + # Snowflake requires ALTER ICEBERG TABLE instead of ALTER TABLE for Iceberg tables + table_format = kwargs.pop("table_format", None) + if table_format and isinstance(table_format, str) and table_format.upper() == "ICEBERG": + from sqlmesh.core.schema_diff import TableAlterOperation + + resolved_expressions = [] + for x in alter_expressions: + if isinstance(x, TableAlterOperation): + alter_expr = x.expression + else: + alter_expr = x + alter_expr.args["kind"] = f"{table_format.upper()} TABLE" + resolved_expressions.append(alter_expr) + + with self.transaction(): + for alter_expr in resolved_expressions: + self.execute(alter_expr) + else: + super().alter_table(alter_expressions, **kwargs) + @t.overload def _columns_to_types( self, diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index fc7f9956bf..bc53d7e497 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -2109,7 +2109,7 @@ def migrate( _check_additive_schema_change( snapshot, alter_operations, kwargs["allow_additive_snapshots"] ) - self.adapter.alter_table(alter_operations) + self.adapter.alter_table(alter_operations, table_format=snapshot.model.table_format) # Apply grants after schema migration deployability_index = kwargs.get("deployability_index") From 9c0c1554eb058107d0d0e7df121ca81958cd2064 Mon Sep 17 00:00:00 2001 From: Guillem Gimenez Date: Mon, 9 Mar 2026 00:34:26 +0100 Subject: [PATCH 3/3] feat: Implement special DDL generation for Snowflake Iceberg tables with PARTITION BY to correctly handle property ordering and CTAS limitations. --- sqlmesh/core/engine_adapter/base.py | 5 ++--- sqlmesh/core/engine_adapter/snowflake.py | 5 +++-- sqlmesh/core/snapshot/evaluator.py | 10 ++++++++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 7decd02063..511644b7f9 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -295,8 +295,8 @@ def _get_source_queries( ) for c in target_columns_to_types ] - query_factory = ( - lambda: exp.Select() + query_factory = lambda: ( + exp.Select() .select(*select_columns) .from_(query_or_df.subquery("select_source_columns")) ) @@ -1184,7 +1184,6 @@ def get_alter_operations( def alter_table( self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]], - **kwargs: t.Any, ) -> None: """ Performs the alter statements to change the current table into the structure of the target table. diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index a2733e8f49..add68b4550 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -42,6 +42,7 @@ SnowparkSession, ) from sqlmesh.core.node import IntervalUnit + from sqlmesh.core.schema_diff import TableAlterOperation @set_catalog( @@ -689,7 +690,7 @@ def clone_table( **kwargs, ) - def alter_table( + def alter_table( # type: ignore[override] self, alter_expressions: t.Union[t.List[exp.Alter], t.List["TableAlterOperation"]], **kwargs: t.Any, @@ -712,7 +713,7 @@ def alter_table( for alter_expr in resolved_expressions: self.execute(alter_expr) else: - super().alter_table(alter_expressions, **kwargs) + super().alter_table(alter_expressions) @t.overload def _columns_to_types( diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index bc53d7e497..7e7901f4d9 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1081,11 +1081,14 @@ def _clone_snapshot_in_dev( try: logger.info(f"Cloning table '{source_table_name}' into '{target_table_name}'") + clone_kwargs: t.Dict[str, t.Any] = {} + if snapshot.model.table_format: + clone_kwargs["table_format"] = snapshot.model.table_format adapter.clone_table( target_table_name, snapshot.table_name(), rendered_physical_properties=rendered_physical_properties, - table_format=snapshot.model.table_format, + **clone_kwargs, ) self._migrate_target_table( target_table_name=target_table_name, @@ -2109,7 +2112,10 @@ def migrate( _check_additive_schema_change( snapshot, alter_operations, kwargs["allow_additive_snapshots"] ) - self.adapter.alter_table(alter_operations, table_format=snapshot.model.table_format) + alter_kwargs: t.Dict[str, t.Any] = {} + if snapshot.model.table_format: + alter_kwargs["table_format"] = snapshot.model.table_format + self.adapter.alter_table(alter_operations, **alter_kwargs) # Apply grants after schema migration deployability_index = kwargs.get("deployability_index")