Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1438,21 +1438,24 @@ def create(
) -> DeployabilityIndex:
if not isinstance(snapshots, dict):
snapshots = {s.snapshot_id: s for s in snapshots}
dag = snapshots_to_dag(snapshots.values())
reversed_dag = dag.reversed.graph

deployability_mapping: t.Dict[SnapshotId, bool] = {}
children_deployability_mapping: t.Dict[SnapshotId, bool] = {}
representative_shared_version_ids: t.Set[SnapshotId] = set()

start_date_cache: t.Optional[t.Dict[str, datetime]] = {}

def _visit(node: SnapshotId, deployable: bool = True) -> None:
if deployability_mapping.get(node) in (False, deployable) and (
deployable or node not in representative_shared_version_ids
):
return

if deployable and node in snapshots:
dag = snapshots_to_dag(snapshots.values())
for node in dag:
if node not in snapshots:
continue
# Make sure that the node is deployable according to all its parents
this_deployable = all(
children_deployability_mapping[p_id]
for p_id in snapshots[node].parents
if p_id in children_deployability_mapping
)
if this_deployable:
snapshot = snapshots[node]
is_forward_only_model = snapshot.is_model and snapshot.model.forward_only
has_auto_restatement = (
Expand Down Expand Up @@ -1481,8 +1484,8 @@ def _visit(node: SnapshotId, deployable: bool = True) -> None:
if not snapshot.is_paused or snapshot.is_indirect_non_breaking:
# This snapshot represents what's currently deployed in prod.
representative_shared_version_ids.add(node)
else:
this_deployable = True

# A child can still be deployable even if its parent is not
children_deployable = (
is_valid_start
and not (
Expand All @@ -1491,18 +1494,12 @@ def _visit(node: SnapshotId, deployable: bool = True) -> None:
and not has_auto_restatement
)
else:
this_deployable, children_deployable = False, False
if node in snapshots and not snapshots[node].is_paused:
children_deployable = False
if not snapshots[node].is_paused:
representative_shared_version_ids.add(node)
else:
representative_shared_version_ids.discard(node)

deployability_mapping[node] = deployability_mapping.get(node, True) and this_deployable
for child in reversed_dag[node]:
_visit(child, children_deployable)

for node in dag.roots:
_visit(node)
deployability_mapping[node] = this_deployable
children_deployability_mapping[node] = children_deployable

deployable_ids = {
snapshot_id for snapshot_id, deployable in deployability_mapping.items() if deployable
Expand Down