Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sqlmesh/core/state_sync/db/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,12 @@ def get_expired_snapshots(
promoted_snapshot_ids = {
snapshot.snapshot_id
for environment in environments
for snapshot in environment.snapshots
for snapshot in (
environment.snapshots
if environment.finalized_ts is not None
# If the environment is not finalized, check both the current snapshots and the previous finalized snapshots
else [*environment.snapshots, *(environment.previous_finalized_snapshots or [])]
)
}

if promoted_snapshot_ids:
Expand Down
77 changes: 77 additions & 0 deletions tests/core/state_sync/test_state_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,83 @@ def test_delete_expired_snapshots_promoted(
assert not state_sync.get_snapshots(all_snapshots)


def test_delete_expired_snapshots_previous_finalized_snapshots(
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
):
"""Test that expired snapshots are protected if they are part of previous finalized snapshots
in a non-finalized environment."""
now_ts = now_timestamp()

# Create an old snapshot that will be expired
old_snapshot = make_snapshot(
SqlModel(
name="a",
query=parse_one("select a, ds"),
),
)
old_snapshot.ttl = "in 10 seconds"
old_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

# Create a new snapshot
new_snapshot = make_snapshot(
SqlModel(
name="a",
query=parse_one("select a, b, ds"),
),
)
new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

state_sync.push_snapshots([old_snapshot, new_snapshot])

# Promote the old snapshot to an environment and finalize it
env = Environment(
name="test_environment",
snapshots=[old_snapshot.table_info],
start_at="2022-01-01",
end_at="2022-01-01",
plan_id="test_plan_id",
previous_plan_id="test_plan_id",
)
state_sync.promote(env)
state_sync.finalize(env)

# Verify old snapshot is not cleaned up because it's in a finalized environment
assert not _get_cleanup_tasks(state_sync)

# Now promote the new snapshot to the same environment (this simulates a new plan)
# The environment will have previous_finalized_snapshots set to the old snapshot
# and will not be finalized yet
env = Environment(
name="test_environment",
snapshots=[new_snapshot.table_info],
previous_finalized_snapshots=[old_snapshot.table_info],
start_at="2022-01-01",
end_at="2022-01-01",
plan_id="new_plan_id",
previous_plan_id="test_plan_id",
)
state_sync.promote(env)

# Manually update the snapshots updated_ts to simulate expiration
state_sync.engine_adapter.execute(
f"UPDATE sqlmesh._snapshots SET updated_ts = {now_ts - 15000} WHERE name = '{old_snapshot.name}' AND identifier = '{old_snapshot.identifier}'"
)

# The old snapshot should still not be cleaned up because it's part of
# previous_finalized_snapshots in a non-finalized environment
assert not _get_cleanup_tasks(state_sync)
state_sync.delete_expired_snapshots(batch_range=ExpiredBatchRange.all_batch_range())
assert state_sync.snapshots_exist([old_snapshot.snapshot_id]) == {old_snapshot.snapshot_id}

# Once the environment is finalized, the expired snapshot should be removed successfully
state_sync.finalize(env)
assert _get_cleanup_tasks(state_sync) == [
SnapshotTableCleanupTask(snapshot=old_snapshot.table_info, dev_table_only=False),
]
state_sync.delete_expired_snapshots(batch_range=ExpiredBatchRange.all_batch_range())
assert not state_sync.snapshots_exist([old_snapshot.snapshot_id])


def test_delete_expired_snapshots_dev_table_cleanup_only(
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
):
Expand Down