diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index d34233fb6d..ccb854d974 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -578,21 +578,23 @@ def _categorize_snapshot( if mode == AutoCategorizationMode.FULL: snapshot.categorize_as(SnapshotChangeCategory.BREAKING) elif self._context_diff.indirectly_modified(snapshot.name): - categories = [] + all_upstream_categories = set() + direct_parent_categories = set() for p_id in dag.upstream(s_id): parent = self._context_diff.snapshots.get(p_id) if parent and self._is_new_snapshot(parent): - categories.append(parent.change_category) + all_upstream_categories.add(parent.change_category) + if p_id in snapshot.parents: + direct_parent_categories.add(parent.change_category) - if not categories or any( - category.is_breaking or category.is_indirect_breaking - for category in categories - if category + if not direct_parent_categories or direct_parent_categories.intersection( + {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING} ): snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING) - elif any(category.is_forward_only for category in categories if category): + elif SnapshotChangeCategory.FORWARD_ONLY in all_upstream_categories: + # FORWARD_ONLY must take precedence over INDIRECT_NON_BREAKING snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) else: snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 71eafcd5a1..9cd5b27050 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -1428,6 +1428,58 @@ def test_forward_only_precedence_over_indirect_non_breaking(init_and_plan_contex ) +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_breaking_only_impacts_immediate_children(init_and_plan_context: t.Callable): + context, plan = init_and_plan_context("examples/sushi") + context.apply(plan) + + breaking_model = context.get_model("sushi.orders") + breaking_model = breaking_model.copy(update={"stamp": "force new version"}) + context.upsert_model(breaking_model) + breaking_snapshot = context.get_snapshot(breaking_model, raise_if_missing=True) + + non_breaking_model = context.get_model("sushi.waiter_revenue_by_day") + context.upsert_model(add_projection_to_model(t.cast(SqlModel, non_breaking_model))) + non_breaking_snapshot = context.get_snapshot(non_breaking_model, raise_if_missing=True) + top_waiter_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) + + plan_builder = context.plan_builder("dev", skip_tests=True, enable_preview=False) + plan_builder.set_choice(breaking_snapshot, SnapshotChangeCategory.BREAKING) + plan = plan_builder.build() + assert ( + plan.context_diff.snapshots[breaking_snapshot.snapshot_id].change_category + == SnapshotChangeCategory.BREAKING + ) + assert ( + plan.context_diff.snapshots[non_breaking_snapshot.snapshot_id].change_category + == SnapshotChangeCategory.NON_BREAKING + ) + assert ( + plan.context_diff.snapshots[top_waiter_snapshot.snapshot_id].change_category + == SnapshotChangeCategory.INDIRECT_NON_BREAKING + ) + assert plan.start == to_timestamp("2023-01-01") + assert not any(i.snapshot_id == top_waiter_snapshot.snapshot_id for i in plan.missing_intervals) + + context.apply(plan) + assert ( + not context.plan_builder("dev", skip_tests=True, enable_preview=False) + .build() + .requires_backfill + ) + + # Deploy everything to prod. + plan = context.plan_builder("prod", skip_tests=True).build() + assert not plan.missing_intervals + + context.apply(plan) + assert ( + not context.plan_builder("prod", skip_tests=True, enable_preview=False) + .build() + .requires_backfill + ) + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_run_with_select_models( init_and_plan_context: t.Callable, diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index aab053e19a..dcb9876204 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -1746,7 +1746,7 @@ def test_indirectly_modified_forward_only_model(make_snapshot, mocker: MockerFix assert updated_snapshot_a.change_category == SnapshotChangeCategory.BREAKING assert updated_snapshot_b.change_category == SnapshotChangeCategory.FORWARD_ONLY - assert updated_snapshot_c.change_category == SnapshotChangeCategory.INDIRECT_BREAKING + assert updated_snapshot_c.change_category == SnapshotChangeCategory.FORWARD_ONLY assert updated_snapshot_d.change_category == SnapshotChangeCategory.INDIRECT_BREAKING deployability_index = DeployabilityIndex.create(