Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,8 @@
CachingStateSync,
StateReader,
StateSync,
cleanup_expired_views,
)
from sqlmesh.core.state_sync.common import delete_expired_snapshots
from sqlmesh.core.janitor import cleanup_expired_views, delete_expired_snapshots
from sqlmesh.core.table_diff import TableDiff
from sqlmesh.core.test import (
ModelTextTestResult,
Expand Down
181 changes: 181 additions & 0 deletions sqlmesh/core/janitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
from __future__ import annotations

import typing as t

from sqlglot import exp

from sqlmesh.core.engine_adapter import EngineAdapter
from sqlmesh.core.console import Console
from sqlmesh.core.dialect import schema_
from sqlmesh.core.environment import Environment
from sqlmesh.core.snapshot import SnapshotEvaluator
from sqlmesh.core.state_sync import StateSync
from sqlmesh.core.state_sync.common import (
logger,
iter_expired_snapshot_batches,
RowBoundary,
ExpiredBatchRange,
)
from sqlmesh.utils.errors import SQLMeshError


def cleanup_expired_views(
default_adapter: EngineAdapter,
engine_adapters: t.Dict[str, EngineAdapter],
environments: t.List[Environment],
warn_on_delete_failure: bool = False,
console: t.Optional[Console] = None,
) -> None:
expired_schema_or_catalog_environments = [
environment
for environment in environments
if environment.suffix_target.is_schema or environment.suffix_target.is_catalog
]
expired_table_environments = [
environment for environment in environments if environment.suffix_target.is_table
]

# We have to use the corresponding adapter if the virtual layer is gateway managed
def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> EngineAdapter:
if gateway_managed and gateway:
return engine_adapters.get(gateway, default_adapter)
return default_adapter

catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set()
schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set()

# Collect schemas and catalogs to drop
for engine_adapter, expired_catalog, expired_schema, suffix_target in {
(
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
snapshot.qualified_view_name.catalog_for_environment(
environment.naming_info, dialect=engine_adapter.dialect
),
snapshot.qualified_view_name.schema_for_environment(
environment.naming_info, dialect=engine_adapter.dialect
),
environment.suffix_target,
)
for environment in expired_schema_or_catalog_environments
for snapshot in environment.snapshots
if snapshot.is_model and not snapshot.is_symbolic
}:
if suffix_target.is_catalog:
if expired_catalog:
catalogs_to_drop.add((engine_adapter, expired_catalog))
else:
schema = schema_(expired_schema, expired_catalog)
schemas_to_drop.add((engine_adapter, schema))

# Drop the views for the expired environments
for engine_adapter, expired_view in {
(
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
snapshot.qualified_view_name.for_environment(
environment.naming_info, dialect=engine_adapter.dialect
),
)
for environment in expired_table_environments
for snapshot in environment.snapshots
if snapshot.is_model and not snapshot.is_symbolic
}:
try:
engine_adapter.drop_view(expired_view, ignore_if_not_exists=True)
if console:
console.update_cleanup_progress(expired_view)
except Exception as e:
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e

# Drop the schemas for the expired environments
for engine_adapter, schema in schemas_to_drop:
try:
engine_adapter.drop_schema(
schema,
ignore_if_not_exists=True,
cascade=True,
)
if console:
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
except Exception as e:
message = f"Failed to drop the expired environment schema '{schema}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e

# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
for engine_adapter, catalog in catalogs_to_drop:
if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG:
try:
engine_adapter.drop_catalog(catalog)
if console:
console.update_cleanup_progress(catalog)
except Exception as e:
message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e


def delete_expired_snapshots(
state_sync: StateSync,
snapshot_evaluator: SnapshotEvaluator,
*,
current_ts: int,
ignore_ttl: bool = False,
batch_size: t.Optional[int] = None,
console: t.Optional[Console] = None,
) -> None:
"""Delete all expired snapshots in batches.

This helper function encapsulates the logic for deleting expired snapshots in batches,
eliminating code duplication across different use cases.

Args:
state_sync: StateSync instance to query and delete expired snapshots from.
snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
current_ts: Timestamp used to evaluate expiration.
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
batch_size: Maximum number of snapshots to fetch per batch.
console: Optional console for reporting progress.

Returns:
The total number of deleted expired snapshots.
"""
num_expired_snapshots = 0
for batch in iter_expired_snapshot_batches(
state_reader=state_sync,
current_ts=current_ts,
ignore_ttl=ignore_ttl,
batch_size=batch_size,
):
end_info = (
f"updated_ts={batch.batch_range.end.updated_ts}"
if isinstance(batch.batch_range.end, RowBoundary)
else f"limit={batch.batch_range.end.batch_size}"
)
logger.info(
"Processing batch of size %s with end %s",
len(batch.expired_snapshot_ids),
end_info,
)
snapshot_evaluator.cleanup(
target_snapshots=batch.cleanup_tasks,
on_complete=console.update_cleanup_progress if console else None,
)
state_sync.delete_expired_snapshots(
batch_range=ExpiredBatchRange(
start=RowBoundary.lowest_boundary(),
end=batch.batch_range.end,
),
ignore_ttl=ignore_ttl,
)
logger.info("Cleaned up expired snapshots batch")
num_expired_snapshots += len(batch.expired_snapshot_ids)
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
1 change: 0 additions & 1 deletion sqlmesh/core/state_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,4 @@
Versions as Versions,
)
from sqlmesh.core.state_sync.cache import CachingStateSync as CachingStateSync
from sqlmesh.core.state_sync.common import cleanup_expired_views as cleanup_expired_views
from sqlmesh.core.state_sync.db import EngineAdapterStateSync as EngineAdapterStateSync
169 changes: 1 addition & 168 deletions sqlmesh/core/state_sync/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,132 +11,23 @@
from pydantic_core.core_schema import ValidationInfo
from sqlglot import exp

from sqlmesh.core.console import Console
from sqlmesh.core.dialect import schema_
from sqlmesh.utils.pydantic import PydanticModel, field_validator
from sqlmesh.core.environment import Environment, EnvironmentStatements, EnvironmentNamingInfo
from sqlmesh.utils.errors import SQLMeshError
from sqlmesh.core.snapshot import (
Snapshot,
SnapshotEvaluator,
SnapshotId,
SnapshotTableCleanupTask,
SnapshotTableInfo,
)

if t.TYPE_CHECKING:
from sqlmesh.core.engine_adapter.base import EngineAdapter
from sqlmesh.core.state_sync.base import Versions, StateReader, StateSync
from sqlmesh.core.state_sync.base import Versions, StateReader

logger = logging.getLogger(__name__)

EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE = 200


def cleanup_expired_views(
default_adapter: EngineAdapter,
engine_adapters: t.Dict[str, EngineAdapter],
environments: t.List[Environment],
warn_on_delete_failure: bool = False,
console: t.Optional[Console] = None,
) -> None:
expired_schema_or_catalog_environments = [
environment
for environment in environments
if environment.suffix_target.is_schema or environment.suffix_target.is_catalog
]
expired_table_environments = [
environment for environment in environments if environment.suffix_target.is_table
]

# We have to use the corresponding adapter if the virtual layer is gateway managed
def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> EngineAdapter:
if gateway_managed and gateway:
return engine_adapters.get(gateway, default_adapter)
return default_adapter

catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set()
schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set()

# Collect schemas and catalogs to drop
for engine_adapter, expired_catalog, expired_schema, suffix_target in {
(
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
snapshot.qualified_view_name.catalog_for_environment(
environment.naming_info, dialect=engine_adapter.dialect
),
snapshot.qualified_view_name.schema_for_environment(
environment.naming_info, dialect=engine_adapter.dialect
),
environment.suffix_target,
)
for environment in expired_schema_or_catalog_environments
for snapshot in environment.snapshots
if snapshot.is_model and not snapshot.is_symbolic
}:
if suffix_target.is_catalog:
if expired_catalog:
catalogs_to_drop.add((engine_adapter, expired_catalog))
else:
schema = schema_(expired_schema, expired_catalog)
schemas_to_drop.add((engine_adapter, schema))

# Drop the views for the expired environments
for engine_adapter, expired_view in {
(
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
snapshot.qualified_view_name.for_environment(
environment.naming_info, dialect=engine_adapter.dialect
),
)
for environment in expired_table_environments
for snapshot in environment.snapshots
if snapshot.is_model and not snapshot.is_symbolic
}:
try:
engine_adapter.drop_view(expired_view, ignore_if_not_exists=True)
if console:
console.update_cleanup_progress(expired_view)
except Exception as e:
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e

# Drop the schemas for the expired environments
for engine_adapter, schema in schemas_to_drop:
try:
engine_adapter.drop_schema(
schema,
ignore_if_not_exists=True,
cascade=True,
)
if console:
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
except Exception as e:
message = f"Failed to drop the expired environment schema '{schema}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e

# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
for engine_adapter, catalog in catalogs_to_drop:
if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG:
try:
engine_adapter.drop_catalog(catalog)
if console:
console.update_cleanup_progress(catalog)
except Exception as e:
message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e


def transactional() -> t.Callable[[t.Callable], t.Callable]:
def decorator(func: t.Callable) -> t.Callable:
@wraps(func)
Expand Down Expand Up @@ -429,61 +320,3 @@ def iter_expired_snapshot_batches(
start=batch.batch_range.end,
end=LimitBoundary(batch_size=batch_size),
)


def delete_expired_snapshots(
state_sync: StateSync,
snapshot_evaluator: SnapshotEvaluator,
*,
current_ts: int,
ignore_ttl: bool = False,
batch_size: t.Optional[int] = None,
console: t.Optional[Console] = None,
) -> None:
"""Delete all expired snapshots in batches.

This helper function encapsulates the logic for deleting expired snapshots in batches,
eliminating code duplication across different use cases.

Args:
state_sync: StateSync instance to query and delete expired snapshots from.
snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
current_ts: Timestamp used to evaluate expiration.
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
batch_size: Maximum number of snapshots to fetch per batch.
console: Optional console for reporting progress.

Returns:
The total number of deleted expired snapshots.
"""
num_expired_snapshots = 0
for batch in iter_expired_snapshot_batches(
state_reader=state_sync,
current_ts=current_ts,
ignore_ttl=ignore_ttl,
batch_size=batch_size,
):
end_info = (
f"updated_ts={batch.batch_range.end.updated_ts}"
if isinstance(batch.batch_range.end, RowBoundary)
else f"limit={batch.batch_range.end.batch_size}"
)
logger.info(
"Processing batch of size %s with end %s",
len(batch.expired_snapshot_ids),
end_info,
)
snapshot_evaluator.cleanup(
target_snapshots=batch.cleanup_tasks,
on_complete=console.update_cleanup_progress if console else None,
)
state_sync.delete_expired_snapshots(
batch_range=ExpiredBatchRange(
start=RowBoundary.lowest_boundary(),
end=batch.batch_range.end,
),
ignore_ttl=ignore_ttl,
)
logger.info("Cleaned up expired snapshots batch")
num_expired_snapshots += len(batch.expired_snapshot_ids)
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
Loading
Loading