From 3c7779bd848edde12ac716b4cc9ac696aaf9accb Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 9 Oct 2025 14:16:52 -0700 Subject: [PATCH 1/2] Fix: Check whether the expired snapshot is among previously finalized snapshots if the environment is not finalized --- sqlmesh/core/state_sync/db/snapshot.py | 7 ++- tests/core/state_sync/test_state_sync.py | 77 ++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/state_sync/db/snapshot.py b/sqlmesh/core/state_sync/db/snapshot.py index 4565990d65..d584c69d65 100644 --- a/sqlmesh/core/state_sync/db/snapshot.py +++ b/sqlmesh/core/state_sync/db/snapshot.py @@ -185,7 +185,12 @@ def get_expired_snapshots( promoted_snapshot_ids = { snapshot.snapshot_id for environment in environments - for snapshot in environment.snapshots + for snapshot in ( + environment.snapshots + if environment.finalized_ts is not None + # If the environment is not finalized, check both the current snapshots and the previous finalized snapshots + else [*environment.snapshots, *(environment.previous_finalized_snapshots or [])] + ) } if promoted_snapshot_ids: diff --git a/tests/core/state_sync/test_state_sync.py b/tests/core/state_sync/test_state_sync.py index bd01dfc652..c36d2e5061 100644 --- a/tests/core/state_sync/test_state_sync.py +++ b/tests/core/state_sync/test_state_sync.py @@ -1638,6 +1638,83 @@ def test_delete_expired_snapshots_promoted( assert not state_sync.get_snapshots(all_snapshots) +def test_delete_expired_snapshots_previous_finalized_snapshots( + state_sync: EngineAdapterStateSync, make_snapshot: t.Callable +): + """Test that expired snapshots are protected if they are part of previous finalized snapshots + in a non-finalized environment.""" + now_ts = now_timestamp() + + # Create an old snapshot that will be expired + old_snapshot = make_snapshot( + SqlModel( + name="a", + query=parse_one("select a, ds"), + ), + ) + old_snapshot.ttl = "in 10 seconds" + old_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + # Create a new snapshot + new_snapshot = make_snapshot( + SqlModel( + name="a", + query=parse_one("select a, b, ds"), + ), + ) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + state_sync.push_snapshots([old_snapshot, new_snapshot]) + + # Promote the old snapshot to an environment and finalize it + env = Environment( + name="test_environment", + snapshots=[old_snapshot.table_info], + start_at="2022-01-01", + end_at="2022-01-01", + plan_id="test_plan_id", + previous_plan_id="test_plan_id", + ) + state_sync.promote(env) + state_sync.finalize(env) + + # Verify old snapshot is not cleaned up because it's in a finalized environment + assert not _get_cleanup_tasks(state_sync) + + # Now promote the new snapshot to the same environment (this simulates a new plan) + # The environment will have previous_finalized_snapshots set to the old snapshot + # and will not be finalized yet + env = Environment( + name="test_environment", + snapshots=[new_snapshot.table_info], + previous_finalized_snapshots=[old_snapshot.table_info], + start_at="2022-01-01", + end_at="2022-01-01", + plan_id="new_plan_id", + previous_plan_id="test_plan_id", + ) + state_sync.promote(env) + + # Manually update the snapshtos updated_ts to simulate expiration + state_sync.engine_adapter.execute( + f"UPDATE sqlmesh._snapshots SET updated_ts = {now_ts - 15000} WHERE name = '{old_snapshot.name}' AND identifier = '{old_snapshot.identifier}'" + ) + + # The old snapshot should still not be cleaned up because it's part of + # previous_finalized_snapshots in a non-finalized environment + assert not _get_cleanup_tasks(state_sync) + state_sync.delete_expired_snapshots(batch_range=ExpiredBatchRange.all_batch_range()) + assert state_sync.snapshots_exist([old_snapshot.snapshot_id]) == {old_snapshot.snapshot_id} + + # Once the environment is finalized, the expired snapshot should be removed successfully + state_sync.finalize(env) + assert _get_cleanup_tasks(state_sync) == [ + SnapshotTableCleanupTask(snapshot=old_snapshot.table_info, dev_table_only=False), + ] + state_sync.delete_expired_snapshots(batch_range=ExpiredBatchRange.all_batch_range()) + assert not state_sync.snapshots_exist([old_snapshot.snapshot_id]) + + def test_delete_expired_snapshots_dev_table_cleanup_only( state_sync: EngineAdapterStateSync, make_snapshot: t.Callable ): From 2d0776811b49f7f1c04e6d9d7d24bcfdf557142e Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 9 Oct 2025 15:34:02 -0700 Subject: [PATCH 2/2] Update tests/core/state_sync/test_state_sync.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/core/state_sync/test_state_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/core/state_sync/test_state_sync.py b/tests/core/state_sync/test_state_sync.py index c36d2e5061..88e168c216 100644 --- a/tests/core/state_sync/test_state_sync.py +++ b/tests/core/state_sync/test_state_sync.py @@ -1695,7 +1695,7 @@ def test_delete_expired_snapshots_previous_finalized_snapshots( ) state_sync.promote(env) - # Manually update the snapshtos updated_ts to simulate expiration + # Manually update the snapshots updated_ts to simulate expiration state_sync.engine_adapter.execute( f"UPDATE sqlmesh._snapshots SET updated_ts = {now_ts - 15000} WHERE name = '{old_snapshot.name}' AND identifier = '{old_snapshot.identifier}'" )