From f39fe9e55b52e778c944ed978f19821d8aed630d Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Fri, 7 Mar 2025 03:31:58 +0000 Subject: [PATCH 1/3] Fix: State sync snapshot count() quoting --- .circleci/continue_config.yml | 8 ++--- sqlmesh/core/state_sync/db/snapshot.py | 3 +- .../integration/test_integration.py | 36 +++++++++++++++++++ 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index 8a1000c6b5..ff5dd5acf8 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -314,10 +314,10 @@ workflows: - bigquery - clickhouse-cloud - athena - filters: - branches: - only: - - main + #filters: + # branches: + # only: + # - main - trigger_private_tests: requires: - style_and_slow_tests diff --git a/sqlmesh/core/state_sync/db/snapshot.py b/sqlmesh/core/state_sync/db/snapshot.py index 6b7d64c57b..e46f7d0151 100644 --- a/sqlmesh/core/state_sync/db/snapshot.py +++ b/sqlmesh/core/state_sync/db/snapshot.py @@ -14,6 +14,7 @@ from sqlmesh.core.state_sync.db.utils import ( snapshot_name_version_filter, snapshot_id_filter, + fetchone, fetchall, create_batches, ) @@ -385,7 +386,7 @@ def update_auto_restatements( def count(self) -> int: """Counts the number of snapshots in the state.""" - result = self.engine_adapter.fetchone(exp.select("COUNT(*)").from_(self.snapshots_table)) + result = fetchone(self.engine_adapter, exp.select("COUNT(*)").from_(self.snapshots_table)) return result[0] if result else 0 def clear_cache(self) -> None: diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 23241e264e..ac73e866b8 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -18,6 +18,7 @@ from sqlmesh import Config, Context from sqlmesh.cli.example_project import init_example_project from sqlmesh.core.config import load_config_from_paths +from sqlmesh.core.config.connection import ConnectionConfig import sqlmesh.core.dialect as d from sqlmesh.core.dialect import select_from_values from sqlmesh.core.model import Model, load_sql_based_model @@ -25,6 +26,7 @@ from sqlmesh.core.engine_adapter.mixins import RowDiffMixin from sqlmesh.core.model.definition import create_sql_model from sqlmesh.core.plan import Plan +from sqlmesh.core.state_sync.db import EngineAdapterStateSync from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory from sqlmesh.utils.date import now, to_date, to_time_column from sqlmesh.core.table_diff import TableDiff @@ -2667,3 +2669,37 @@ def test_table_diff_identical_dataset(ctx: TestContext): assert row_diff.stats["t_only_count"] == 0 assert row_diff.s_sample.shape == (0, 3) assert row_diff.t_sample.shape == (0, 3) + + +def test_state_migrate_from_scratch(ctx: TestContext): + if ctx.test_type != "query": + pytest.skip("state migration tests are only relevant for query") + + test_schema = ctx.add_test_suffix("state") + ctx._schemas.append(test_schema) # so it gets cleaned up when the test finishes + + def _use_warehouse_as_state_connection(gateway_name: str, config: Config): + warehouse_connection = config.gateways[gateway_name].connection + assert isinstance(warehouse_connection, ConnectionConfig) + if warehouse_connection.is_forbidden_for_state_sync: + pytest.skip( + f"{warehouse_connection.type_} doesnt support being used as a state connection" + ) + + # this triggers the fallback to using the warehouse as a state connection + config.gateways[gateway_name].state_connection = None + assert config.get_state_connection(gateway_name) is None + + config.gateways[gateway_name].state_schema = test_schema + + sqlmesh_context = ctx.create_context(config_mutator=_use_warehouse_as_state_connection) + assert sqlmesh_context.config.get_state_schema(ctx.gateway) == test_schema + + state_sync = ( + sqlmesh_context._new_state_sync() + ) # this prevents migrate() being called which it does if you access the state_sync property + assert isinstance(state_sync, EngineAdapterStateSync) + assert state_sync.engine_adapter.dialect == ctx.dialect + + # will throw if one of the migrations produces an error, which can happen if we forget to take quoting or normalization into account + sqlmesh_context.migrate() From 8816caf69cd540cc7b3ed93cd13a7b60acb56d24 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Fri, 7 Mar 2025 04:37:54 +0000 Subject: [PATCH 2/3] Fix Databricks bug in migration v0055 --- ...used_ts_ttl_ms_unrestorable_to_snapshot.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/sqlmesh/migrations/v0055_add_updated_ts_unpaused_ts_ttl_ms_unrestorable_to_snapshot.py b/sqlmesh/migrations/v0055_add_updated_ts_unpaused_ts_ttl_ms_unrestorable_to_snapshot.py index 0676aa50b4..8ef88b0a66 100644 --- a/sqlmesh/migrations/v0055_add_updated_ts_unpaused_ts_ttl_ms_unrestorable_to_snapshot.py +++ b/sqlmesh/migrations/v0055_add_updated_ts_unpaused_ts_ttl_ms_unrestorable_to_snapshot.py @@ -46,6 +46,30 @@ def migrate(state_sync, **kwargs): # type: ignore ] engine_adapter.execute(add_column_exps) + if engine_adapter.dialect == "databricks": + # Databricks will throw an error like: + # > databricks.sql.exc.ServerOperationError: [DELTA_UNSUPPORTED_DROP_COLUMN] DROP COLUMN is not supported for your Delta table. + # when we try to drop `expiration_ts` below unless we set delta.columnMapping.mode to 'name' + alter_table_exp = exp.Alter( + this=exp.to_table(snapshots_table), + kind="TABLE", + actions=[ + exp.AlterSet( + expressions=[ + exp.Properties( + expressions=[ + exp.Property( + this=exp.Literal.string("delta.columnMapping.mode"), + value=exp.Literal.string("name"), + ) + ] + ) + ] + ) + ], + ) + engine_adapter.execute(alter_table_exp) + drop_column_exp = exp.Alter( this=exp.to_table(snapshots_table), kind="TABLE", From 9808881b8ba843f649652a152d328257545e71dc Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Fri, 7 Mar 2025 05:08:19 +0000 Subject: [PATCH 3/3] Reinstate branch filter --- .circleci/continue_config.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index ff5dd5acf8..8a1000c6b5 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -314,10 +314,10 @@ workflows: - bigquery - clickhouse-cloud - athena - #filters: - # branches: - # only: - # - main + filters: + branches: + only: + - main - trigger_private_tests: requires: - style_and_slow_tests