From a8470bb9ec16b78570f2f3c6d1eb5036288615ac Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 6 Mar 2025 15:23:30 -0800 Subject: [PATCH] Fix: Simplify the deployability index creation --- sqlmesh/core/snapshot/definition.py | 39 +++++++++++++---------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 7b73d0a4f1..4efb2f26c7 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1438,21 +1438,24 @@ def create( ) -> DeployabilityIndex: if not isinstance(snapshots, dict): snapshots = {s.snapshot_id: s for s in snapshots} - dag = snapshots_to_dag(snapshots.values()) - reversed_dag = dag.reversed.graph deployability_mapping: t.Dict[SnapshotId, bool] = {} + children_deployability_mapping: t.Dict[SnapshotId, bool] = {} representative_shared_version_ids: t.Set[SnapshotId] = set() start_date_cache: t.Optional[t.Dict[str, datetime]] = {} - def _visit(node: SnapshotId, deployable: bool = True) -> None: - if deployability_mapping.get(node) in (False, deployable) and ( - deployable or node not in representative_shared_version_ids - ): - return - - if deployable and node in snapshots: + dag = snapshots_to_dag(snapshots.values()) + for node in dag: + if node not in snapshots: + continue + # Make sure that the node is deployable according to all its parents + this_deployable = all( + children_deployability_mapping[p_id] + for p_id in snapshots[node].parents + if p_id in children_deployability_mapping + ) + if this_deployable: snapshot = snapshots[node] is_forward_only_model = snapshot.is_model and snapshot.model.forward_only has_auto_restatement = ( @@ -1481,8 +1484,8 @@ def _visit(node: SnapshotId, deployable: bool = True) -> None: if not snapshot.is_paused or snapshot.is_indirect_non_breaking: # This snapshot represents what's currently deployed in prod. representative_shared_version_ids.add(node) - else: - this_deployable = True + + # A child can still be deployable even if its parent is not children_deployable = ( is_valid_start and not ( @@ -1491,18 +1494,12 @@ def _visit(node: SnapshotId, deployable: bool = True) -> None: and not has_auto_restatement ) else: - this_deployable, children_deployable = False, False - if node in snapshots and not snapshots[node].is_paused: + children_deployable = False + if not snapshots[node].is_paused: representative_shared_version_ids.add(node) - else: - representative_shared_version_ids.discard(node) - - deployability_mapping[node] = deployability_mapping.get(node, True) and this_deployable - for child in reversed_dag[node]: - _visit(child, children_deployable) - for node in dag.roots: - _visit(node) + deployability_mapping[node] = this_deployable + children_deployability_mapping[node] = children_deployable deployable_ids = { snapshot_id for snapshot_id, deployable in deployability_mapping.items() if deployable