Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sqlmesh/core/state_sync/db/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sqlmesh.core.state_sync.db.utils import (
snapshot_name_version_filter,
snapshot_id_filter,
fetchone,
fetchall,
create_batches,
)
Expand Down Expand Up @@ -385,7 +386,7 @@ def update_auto_restatements(

def count(self) -> int:
"""Counts the number of snapshots in the state."""
result = self.engine_adapter.fetchone(exp.select("COUNT(*)").from_(self.snapshots_table))
result = fetchone(self.engine_adapter, exp.select("COUNT(*)").from_(self.snapshots_table))
return result[0] if result else 0

def clear_cache(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,30 @@ def migrate(state_sync, **kwargs): # type: ignore
]
engine_adapter.execute(add_column_exps)

if engine_adapter.dialect == "databricks":
# Databricks will throw an error like:
# > databricks.sql.exc.ServerOperationError: [DELTA_UNSUPPORTED_DROP_COLUMN] DROP COLUMN is not supported for your Delta table.
# when we try to drop `expiration_ts` below unless we set delta.columnMapping.mode to 'name'
alter_table_exp = exp.Alter(
this=exp.to_table(snapshots_table),
kind="TABLE",
actions=[
exp.AlterSet(
expressions=[
exp.Properties(
expressions=[
exp.Property(
this=exp.Literal.string("delta.columnMapping.mode"),
value=exp.Literal.string("name"),
)
]
)
]
)
],
)
engine_adapter.execute(alter_table_exp)

drop_column_exp = exp.Alter(
this=exp.to_table(snapshots_table),
kind="TABLE",
Expand Down
36 changes: 36 additions & 0 deletions tests/core/engine_adapter/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
from sqlmesh import Config, Context
from sqlmesh.cli.example_project import init_example_project
from sqlmesh.core.config import load_config_from_paths
from sqlmesh.core.config.connection import ConnectionConfig
import sqlmesh.core.dialect as d
from sqlmesh.core.dialect import select_from_values
from sqlmesh.core.model import Model, load_sql_based_model
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
from sqlmesh.core.engine_adapter.mixins import RowDiffMixin
from sqlmesh.core.model.definition import create_sql_model
from sqlmesh.core.plan import Plan
from sqlmesh.core.state_sync.db import EngineAdapterStateSync
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory
from sqlmesh.utils.date import now, to_date, to_time_column
from sqlmesh.core.table_diff import TableDiff
Expand Down Expand Up @@ -2667,3 +2669,37 @@ def test_table_diff_identical_dataset(ctx: TestContext):
assert row_diff.stats["t_only_count"] == 0
assert row_diff.s_sample.shape == (0, 3)
assert row_diff.t_sample.shape == (0, 3)


def test_state_migrate_from_scratch(ctx: TestContext):
if ctx.test_type != "query":
pytest.skip("state migration tests are only relevant for query")

test_schema = ctx.add_test_suffix("state")
ctx._schemas.append(test_schema) # so it gets cleaned up when the test finishes

def _use_warehouse_as_state_connection(gateway_name: str, config: Config):
warehouse_connection = config.gateways[gateway_name].connection
assert isinstance(warehouse_connection, ConnectionConfig)
if warehouse_connection.is_forbidden_for_state_sync:
pytest.skip(
f"{warehouse_connection.type_} doesnt support being used as a state connection"
)

# this triggers the fallback to using the warehouse as a state connection
config.gateways[gateway_name].state_connection = None
assert config.get_state_connection(gateway_name) is None

config.gateways[gateway_name].state_schema = test_schema

sqlmesh_context = ctx.create_context(config_mutator=_use_warehouse_as_state_connection)
assert sqlmesh_context.config.get_state_schema(ctx.gateway) == test_schema

state_sync = (
sqlmesh_context._new_state_sync()
) # this prevents migrate() being called which it does if you access the state_sync property
assert isinstance(state_sync, EngineAdapterStateSync)
assert state_sync.engine_adapter.dialect == ctx.dialect

# will throw if one of the migrations produces an error, which can happen if we forget to take quoting or normalization into account
sqlmesh_context.migrate()