Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,21 +578,23 @@ def _categorize_snapshot(
if mode == AutoCategorizationMode.FULL:
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
elif self._context_diff.indirectly_modified(snapshot.name):
categories = []
all_upstream_categories = set()
direct_parent_categories = set()

for p_id in dag.upstream(s_id):
parent = self._context_diff.snapshots.get(p_id)

if parent and self._is_new_snapshot(parent):
categories.append(parent.change_category)
all_upstream_categories.add(parent.change_category)
if p_id in snapshot.parents:
direct_parent_categories.add(parent.change_category)

if not categories or any(
category.is_breaking or category.is_indirect_breaking
for category in categories
if category
if not direct_parent_categories or direct_parent_categories.intersection(
{SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING}
):
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING)
elif any(category.is_forward_only for category in categories if category):
elif SnapshotChangeCategory.FORWARD_ONLY in all_upstream_categories:
# FORWARD_ONLY must take precedence over INDIRECT_NON_BREAKING
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
else:
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING)
Expand Down
52 changes: 52 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,58 @@ def test_forward_only_precedence_over_indirect_non_breaking(init_and_plan_contex
)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_breaking_only_impacts_immediate_children(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
context.apply(plan)

breaking_model = context.get_model("sushi.orders")
breaking_model = breaking_model.copy(update={"stamp": "force new version"})
context.upsert_model(breaking_model)
breaking_snapshot = context.get_snapshot(breaking_model, raise_if_missing=True)

non_breaking_model = context.get_model("sushi.waiter_revenue_by_day")
context.upsert_model(add_projection_to_model(t.cast(SqlModel, non_breaking_model)))
non_breaking_snapshot = context.get_snapshot(non_breaking_model, raise_if_missing=True)
top_waiter_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True)

plan_builder = context.plan_builder("dev", skip_tests=True, enable_preview=False)
plan_builder.set_choice(breaking_snapshot, SnapshotChangeCategory.BREAKING)
plan = plan_builder.build()
assert (
plan.context_diff.snapshots[breaking_snapshot.snapshot_id].change_category
== SnapshotChangeCategory.BREAKING
)
assert (
plan.context_diff.snapshots[non_breaking_snapshot.snapshot_id].change_category
== SnapshotChangeCategory.NON_BREAKING
)
assert (
plan.context_diff.snapshots[top_waiter_snapshot.snapshot_id].change_category
== SnapshotChangeCategory.INDIRECT_NON_BREAKING
)
assert plan.start == to_timestamp("2023-01-01")
assert not any(i.snapshot_id == top_waiter_snapshot.snapshot_id for i in plan.missing_intervals)

context.apply(plan)
assert (
not context.plan_builder("dev", skip_tests=True, enable_preview=False)
.build()
.requires_backfill
)

# Deploy everything to prod.
plan = context.plan_builder("prod", skip_tests=True).build()
assert not plan.missing_intervals

context.apply(plan)
assert (
not context.plan_builder("prod", skip_tests=True, enable_preview=False)
.build()
.requires_backfill
)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_run_with_select_models(
init_and_plan_context: t.Callable,
Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,7 @@ def test_indirectly_modified_forward_only_model(make_snapshot, mocker: MockerFix

assert updated_snapshot_a.change_category == SnapshotChangeCategory.BREAKING
assert updated_snapshot_b.change_category == SnapshotChangeCategory.FORWARD_ONLY
assert updated_snapshot_c.change_category == SnapshotChangeCategory.INDIRECT_BREAKING
assert updated_snapshot_c.change_category == SnapshotChangeCategory.FORWARD_ONLY
assert updated_snapshot_d.change_category == SnapshotChangeCategory.INDIRECT_BREAKING

deployability_index = DeployabilityIndex.create(
Expand Down