From 1720d5599808ee04c87e957161dc23209acd04ac Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Tue, 11 Mar 2025 15:29:46 -0700 Subject: [PATCH 1/2] Fix: Direct breaking category should only impact direct children snapshot --- sqlmesh/core/plan/builder.py | 14 +++++---- tests/core/test_integration.py | 52 ++++++++++++++++++++++++++++++++++ tests/core/test_plan.py | 2 +- 3 files changed, 62 insertions(+), 6 deletions(-) diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index d34233fb6d..6561cb0b0a 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -578,21 +578,25 @@ def _categorize_snapshot( if mode == AutoCategorizationMode.FULL: snapshot.categorize_as(SnapshotChangeCategory.BREAKING) elif self._context_diff.indirectly_modified(snapshot.name): - categories = [] + all_upstream_categories = [] + direct_parent_categories = [] for p_id in dag.upstream(s_id): parent = self._context_diff.snapshots.get(p_id) if parent and self._is_new_snapshot(parent): - categories.append(parent.change_category) + all_upstream_categories.append(parent.change_category) + if p_id in snapshot.parents: + direct_parent_categories.append(parent.change_category) - if not categories or any( + if not direct_parent_categories or any( category.is_breaking or category.is_indirect_breaking - for category in categories + for category in direct_parent_categories if category ): snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING) - elif any(category.is_forward_only for category in categories if category): + elif any(category.is_forward_only for category in all_upstream_categories if category): + # FORWARD_ONLY must take precedence over INDIRECT_NON_BREAKING snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) else: snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 71eafcd5a1..9cd5b27050 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -1428,6 +1428,58 @@ def test_forward_only_precedence_over_indirect_non_breaking(init_and_plan_contex ) +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_breaking_only_impacts_immediate_children(init_and_plan_context: t.Callable): + context, plan = init_and_plan_context("examples/sushi") + context.apply(plan) + + breaking_model = context.get_model("sushi.orders") + breaking_model = breaking_model.copy(update={"stamp": "force new version"}) + context.upsert_model(breaking_model) + breaking_snapshot = context.get_snapshot(breaking_model, raise_if_missing=True) + + non_breaking_model = context.get_model("sushi.waiter_revenue_by_day") + context.upsert_model(add_projection_to_model(t.cast(SqlModel, non_breaking_model))) + non_breaking_snapshot = context.get_snapshot(non_breaking_model, raise_if_missing=True) + top_waiter_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) + + plan_builder = context.plan_builder("dev", skip_tests=True, enable_preview=False) + plan_builder.set_choice(breaking_snapshot, SnapshotChangeCategory.BREAKING) + plan = plan_builder.build() + assert ( + plan.context_diff.snapshots[breaking_snapshot.snapshot_id].change_category + == SnapshotChangeCategory.BREAKING + ) + assert ( + plan.context_diff.snapshots[non_breaking_snapshot.snapshot_id].change_category + == SnapshotChangeCategory.NON_BREAKING + ) + assert ( + plan.context_diff.snapshots[top_waiter_snapshot.snapshot_id].change_category + == SnapshotChangeCategory.INDIRECT_NON_BREAKING + ) + assert plan.start == to_timestamp("2023-01-01") + assert not any(i.snapshot_id == top_waiter_snapshot.snapshot_id for i in plan.missing_intervals) + + context.apply(plan) + assert ( + not context.plan_builder("dev", skip_tests=True, enable_preview=False) + .build() + .requires_backfill + ) + + # Deploy everything to prod. + plan = context.plan_builder("prod", skip_tests=True).build() + assert not plan.missing_intervals + + context.apply(plan) + assert ( + not context.plan_builder("prod", skip_tests=True, enable_preview=False) + .build() + .requires_backfill + ) + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_run_with_select_models( init_and_plan_context: t.Callable, diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index aab053e19a..dcb9876204 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -1746,7 +1746,7 @@ def test_indirectly_modified_forward_only_model(make_snapshot, mocker: MockerFix assert updated_snapshot_a.change_category == SnapshotChangeCategory.BREAKING assert updated_snapshot_b.change_category == SnapshotChangeCategory.FORWARD_ONLY - assert updated_snapshot_c.change_category == SnapshotChangeCategory.INDIRECT_BREAKING + assert updated_snapshot_c.change_category == SnapshotChangeCategory.FORWARD_ONLY assert updated_snapshot_d.change_category == SnapshotChangeCategory.INDIRECT_BREAKING deployability_index = DeployabilityIndex.create( From ec5b0505840bc1c2d488d4d36adbd4609b84e8af Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Tue, 11 Mar 2025 21:37:48 -0700 Subject: [PATCH 2/2] Address comments --- sqlmesh/core/plan/builder.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 6561cb0b0a..ccb854d974 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -578,24 +578,22 @@ def _categorize_snapshot( if mode == AutoCategorizationMode.FULL: snapshot.categorize_as(SnapshotChangeCategory.BREAKING) elif self._context_diff.indirectly_modified(snapshot.name): - all_upstream_categories = [] - direct_parent_categories = [] + all_upstream_categories = set() + direct_parent_categories = set() for p_id in dag.upstream(s_id): parent = self._context_diff.snapshots.get(p_id) if parent and self._is_new_snapshot(parent): - all_upstream_categories.append(parent.change_category) + all_upstream_categories.add(parent.change_category) if p_id in snapshot.parents: - direct_parent_categories.append(parent.change_category) + direct_parent_categories.add(parent.change_category) - if not direct_parent_categories or any( - category.is_breaking or category.is_indirect_breaking - for category in direct_parent_categories - if category + if not direct_parent_categories or direct_parent_categories.intersection( + {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING} ): snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING) - elif any(category.is_forward_only for category in all_upstream_categories if category): + elif SnapshotChangeCategory.FORWARD_ONLY in all_upstream_categories: # FORWARD_ONLY must take precedence over INDIRECT_NON_BREAKING snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) else: