Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ Formatting settings for the `sqlmesh format` command and UI.

Configuration for the `sqlmesh janitor` command.

| Option | Description | Type | Required |
|--------------------------|----------------------------------------------------------------------------------------------------------------------------|:-------:|:--------:|
| `warn_on_delete_failure` | Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views (Default: False) | boolean | N |
| Option | Description | Type | Required |
|---------------------------------|----------------------------------------------------------------------------------------------------------------------------|:-------:|:--------:|
| `warn_on_delete_failure` | Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views (Default: False) | boolean | N |
| `expired_snapshots_batch_size` | Maximum number of expired snapshots to clean in a single batch (Default: 200) | int | N |


## UI
Expand Down
12 changes: 12 additions & 0 deletions sqlmesh/core/config/janitor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
from __future__ import annotations

import typing as t

from sqlmesh.core.config.base import BaseConfig
from sqlmesh.utils.pydantic import field_validator


class JanitorConfig(BaseConfig):
"""The configuration for the janitor.

Args:
warn_on_delete_failure: Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views.
expired_snapshots_batch_size: Maximum number of expired snapshots to clean in a single batch.
"""

warn_on_delete_failure: bool = False
expired_snapshots_batch_size: t.Optional[int] = None

@field_validator("expired_snapshots_batch_size", mode="before")
@classmethod
def _validate_batch_size(cls, value: int) -> int:
batch_size = int(value)
if batch_size <= 0:
raise ValueError("expired_snapshots_batch_size must be greater than 0")
return batch_size
20 changes: 8 additions & 12 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
StateSync,
cleanup_expired_views,
)
from sqlmesh.core.state_sync.common import delete_expired_snapshots
from sqlmesh.core.table_diff import TableDiff
from sqlmesh.core.test import (
ModelTextTestResult,
Expand Down Expand Up @@ -2852,19 +2853,14 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
# Clean up expired environments by removing their views and schemas
self._cleanup_environments(current_ts=current_ts)

cleanup_targets = self.state_sync.get_expired_snapshots(
ignore_ttl=ignore_ttl, current_ts=current_ts
)

# Remove the expired snapshots tables
self.snapshot_evaluator.cleanup(
target_snapshots=cleanup_targets,
on_complete=self.console.update_cleanup_progress,
delete_expired_snapshots(
self.state_sync,
self.snapshot_evaluator,
current_ts=current_ts,
ignore_ttl=ignore_ttl,
console=self.console,
batch_size=self.config.janitor.expired_snapshots_batch_size,
)

# Delete the expired snapshot records from the state sync
self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl, current_ts=current_ts)

self.state_sync.compact_intervals()

def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
Expand Down
51 changes: 25 additions & 26 deletions sqlmesh/core/state_sync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from sqlmesh import migrations
from sqlmesh.core.environment import (
Environment,
EnvironmentNamingInfo,
EnvironmentStatements,
EnvironmentSummary,
)
Expand All @@ -21,17 +20,20 @@
SnapshotIdLike,
SnapshotIdAndVersionLike,
SnapshotInfoLike,
SnapshotTableCleanupTask,
SnapshotTableInfo,
SnapshotNameVersion,
SnapshotIdAndVersion,
)
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
from sqlmesh.utils import major_minor
from sqlmesh.utils.date import TimeLike
from sqlmesh.utils.errors import SQLMeshError
from sqlmesh.utils.pydantic import PydanticModel, ValidationInfo, field_validator
from sqlmesh.core.state_sync.common import StateStream
from sqlmesh.utils.pydantic import PydanticModel, field_validator
from sqlmesh.core.state_sync.common import (
StateStream,
ExpiredSnapshotBatch,
PromotionResult,
ExpiredBatchRange,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -72,20 +74,6 @@ def _schema_version_validator(cls, v: t.Any) -> int:
SCHEMA_VERSION: int = MIN_SCHEMA_VERSION + len(MIGRATIONS) - 1


class PromotionResult(PydanticModel):
added: t.List[SnapshotTableInfo]
removed: t.List[SnapshotTableInfo]
removed_environment_naming_info: t.Optional[EnvironmentNamingInfo]

@field_validator("removed_environment_naming_info")
def _validate_removed_environment_naming_info(
cls, v: t.Optional[EnvironmentNamingInfo], info: ValidationInfo
) -> t.Optional[EnvironmentNamingInfo]:
if v and not info.data.get("removed"):
raise ValueError("removed_environment_naming_info must be None if removed is empty")
return v


class StateReader(abc.ABC):
"""Abstract base class for read-only operations on snapshot and environment state."""

Expand Down Expand Up @@ -315,15 +303,21 @@ def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStre

@abc.abstractmethod
def get_expired_snapshots(
self, current_ts: t.Optional[int] = None, ignore_ttl: bool = False
) -> t.List[SnapshotTableCleanupTask]:
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
self,
*,
batch_range: ExpiredBatchRange,
current_ts: t.Optional[int] = None,
ignore_ttl: bool = False,
) -> t.Optional[ExpiredSnapshotBatch]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a reason to make this t.Optional vs just returning an empty list if there are no expired snapshots like it did before?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is correct

"""Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).

Expired snapshots are snapshots that have exceeded their time-to-live
and are no longer in use within an environment.
Args:
current_ts: Timestamp used to evaluate expiration.
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
batch_range: The range of the batch to fetch.

Returns:
The list of table cleanup tasks.
A batch describing expired snapshots or None if no snapshots are pending cleanup.
"""

@abc.abstractmethod
Expand Down Expand Up @@ -363,16 +357,21 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:

@abc.abstractmethod
def delete_expired_snapshots(
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
self,
batch_range: ExpiredBatchRange,
ignore_ttl: bool = False,
current_ts: t.Optional[int] = None,
) -> None:
"""Removes expired snapshots.

Expired snapshots are snapshots that have exceeded their time-to-live
and are no longer in use within an environment.

Args:
batch_range: The range of snapshots to delete in this batch.
ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting
all snapshots that are not referenced in any environment
current_ts: Timestamp used to evaluate expiration.
"""

@abc.abstractmethod
Expand Down
13 changes: 10 additions & 3 deletions sqlmesh/core/state_sync/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
from sqlmesh.core.state_sync.base import DelegatingStateSync, StateSync
from sqlmesh.core.state_sync.common import ExpiredBatchRange
from sqlmesh.utils.date import TimeLike, now_timestamp


Expand Down Expand Up @@ -108,11 +109,17 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
self.state_sync.delete_snapshots(snapshot_ids)

def delete_expired_snapshots(
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
self,
batch_range: ExpiredBatchRange,
ignore_ttl: bool = False,
current_ts: t.Optional[int] = None,
) -> None:
current_ts = current_ts or now_timestamp()
self.snapshot_cache.clear()
self.state_sync.delete_expired_snapshots(current_ts=current_ts, ignore_ttl=ignore_ttl)
self.state_sync.delete_expired_snapshots(
batch_range=batch_range,
ignore_ttl=ignore_ttl,
current_ts=current_ts,
)

def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
for snapshot_intervals in snapshots_intervals:
Expand Down
Loading