Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ def _get_source_queries(
)
for c in target_columns_to_types
]
query_factory = (
lambda: exp.Select()
query_factory = lambda: (
exp.Select()
.select(*select_columns)
.from_(query_or_df.subquery("select_source_columns"))
)
Expand Down Expand Up @@ -1077,10 +1077,11 @@ def clone_table(
raise NotImplementedError(f"Engine does not support cloning: {type(self)}")

kwargs.pop("rendered_physical_properties", None)
table_kind = kwargs.pop("table_kind", "TABLE")
self.execute(
exp.Create(
this=exp.to_table(target_table_name),
kind="TABLE",
kind=table_kind,
replace=replace,
exists=exists,
clone=exp.Clone(
Expand Down
32 changes: 32 additions & 0 deletions sqlmesh/core/engine_adapter/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
SnowparkSession,
)
from sqlmesh.core.node import IntervalUnit
from sqlmesh.core.schema_diff import TableAlterOperation


@set_catalog(
Expand Down Expand Up @@ -675,6 +676,12 @@ def clone_table(
if isinstance(table_type, exp.TransientProperty):
kwargs["properties"] = exp.Properties(expressions=[table_type])

# Snowflake requires CREATE ICEBERG TABLE ... CLONE for Iceberg tables
# instead of the regular CREATE TABLE ... CLONE
table_format = kwargs.pop("table_format", None)
if table_format and isinstance(table_format, str) and table_format.upper() == "ICEBERG":
kwargs["table_kind"] = f"{table_format.upper()} TABLE"

super().clone_table(
target_table_name,
source_table_name,
Expand All @@ -683,6 +690,31 @@ def clone_table(
**kwargs,
)

def alter_table( # type: ignore[override]
self,
alter_expressions: t.Union[t.List[exp.Alter], t.List["TableAlterOperation"]],
**kwargs: t.Any,
) -> None:
# Snowflake requires ALTER ICEBERG TABLE instead of ALTER TABLE for Iceberg tables
table_format = kwargs.pop("table_format", None)
if table_format and isinstance(table_format, str) and table_format.upper() == "ICEBERG":
from sqlmesh.core.schema_diff import TableAlterOperation

resolved_expressions = []
for x in alter_expressions:
if isinstance(x, TableAlterOperation):
alter_expr = x.expression
else:
alter_expr = x
alter_expr.args["kind"] = f"{table_format.upper()} TABLE"
resolved_expressions.append(alter_expr)

with self.transaction():
for alter_expr in resolved_expressions:
self.execute(alter_expr)
else:
super().alter_table(alter_expressions)

@t.overload
def _columns_to_types(
self,
Expand Down
9 changes: 8 additions & 1 deletion sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,10 +1081,14 @@ def _clone_snapshot_in_dev(

try:
logger.info(f"Cloning table '{source_table_name}' into '{target_table_name}'")
clone_kwargs: t.Dict[str, t.Any] = {}
if snapshot.model.table_format:
clone_kwargs["table_format"] = snapshot.model.table_format
adapter.clone_table(
target_table_name,
snapshot.table_name(),
rendered_physical_properties=rendered_physical_properties,
**clone_kwargs,
)
self._migrate_target_table(
target_table_name=target_table_name,
Expand Down Expand Up @@ -2108,7 +2112,10 @@ def migrate(
_check_additive_schema_change(
snapshot, alter_operations, kwargs["allow_additive_snapshots"]
)
self.adapter.alter_table(alter_operations)
alter_kwargs: t.Dict[str, t.Any] = {}
if snapshot.model.table_format:
alter_kwargs["table_format"] = snapshot.model.table_format
self.adapter.alter_table(alter_operations, **alter_kwargs)

# Apply grants after schema migration
deployability_index = kwargs.get("deployability_index")
Expand Down