diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 23ab0b21db..0c9635a7c2 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -2081,16 +2081,20 @@ def missing_intervals( continue snapshot_end_date = existing_interval_end + snapshot_start_date = max( + to_datetime(snapshot_start_date), + to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)), + ) + if snapshot_start_date > to_datetime(snapshot_end_date): + continue + missing_interval_end_date = snapshot_end_date node_end_date = snapshot.node.end if node_end_date and (to_datetime(node_end_date) < to_datetime(snapshot_end_date)): missing_interval_end_date = node_end_date intervals = snapshot.missing_intervals( - max( - to_datetime(snapshot_start_date), - to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)), - ), + snapshot_start_date, missing_interval_end_date, execution_time=execution_time, deployability_index=deployability_index, @@ -2295,14 +2299,16 @@ def start_date( if not isinstance(snapshots, dict): snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots} - earliest = snapshot.node.cron_prev(snapshot.node.cron_floor(relative_to or now())) - - for parent in snapshot.parents: - if parent in snapshots: - earliest = min( - earliest, - start_date(snapshots[parent], snapshots, cache=cache, relative_to=relative_to), - ) + parent_starts = [ + start_date(snapshots[parent], snapshots, cache=cache, relative_to=relative_to) + for parent in snapshot.parents + if parent in snapshots + ] + earliest = ( + min(parent_starts) + if parent_starts + else snapshot.node.cron_prev(snapshot.node.cron_floor(relative_to or now())) + ) cache[key] = earliest return earliest diff --git a/tests/core/integration/test_restatement.py b/tests/core/integration/test_restatement.py index a00d8d7ab5..3694efce31 100644 --- a/tests/core/integration/test_restatement.py +++ b/tests/core/integration/test_restatement.py @@ -1880,3 +1880,56 @@ def _run_restatement_plan(tmp_path: Path, config: Config, q: queue.Queue): assert len(model_a.intervals) set_console(orig_console) + + +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_restatement_plan_outside_parent_date_range(init_and_plan_context: t.Callable): + context, _ = init_and_plan_context("examples/sushi") + + context.upsert_model("sushi.items", start="2023-01-06") + context.upsert_model("sushi.orders", start="2023-01-06") + # One of the parents should derive the start from its own parents for the issue + # to reproduce + context.upsert_model("sushi.order_items", start=None) + context.upsert_model("sushi.waiter_revenue_by_day", start="2023-01-01", audits=[]) + + context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True) + + restated_snapshot = context.get_snapshot("sushi.waiter_revenue_by_day") + downstream_snapshot = context.get_snapshot("sushi.top_waiters") + + plan = context.plan_builder( + restate_models=["sushi.waiter_revenue_by_day"], + start="2023-01-01", + end="2023-01-01", + min_intervals=0, + ).build() + assert plan.snapshots != context.snapshots + + assert plan.requires_backfill + assert plan.restatements == { + restated_snapshot.snapshot_id: (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), + downstream_snapshot.snapshot_id: (to_timestamp("2023-01-01"), to_timestamp("2023-01-09")), + } + assert plan.missing_intervals == [ + SnapshotIntervals( + snapshot_id=downstream_snapshot.snapshot_id, + intervals=[ + (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), + (to_timestamp("2023-01-02"), to_timestamp("2023-01-03")), + (to_timestamp("2023-01-03"), to_timestamp("2023-01-04")), + (to_timestamp("2023-01-04"), to_timestamp("2023-01-05")), + (to_timestamp("2023-01-05"), to_timestamp("2023-01-06")), + (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), + (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), + ], + ), + SnapshotIntervals( + snapshot_id=restated_snapshot.snapshot_id, + intervals=[ + (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), + ], + ), + ] + + context.apply(plan) diff --git a/tests/core/integration/test_run.py b/tests/core/integration/test_run.py index a3b84b5a9e..c3e6626ad0 100644 --- a/tests/core/integration/test_run.py +++ b/tests/core/integration/test_run.py @@ -109,6 +109,7 @@ def test_run_respects_excluded_transitive_dependencies(init_and_plan_context: t. kind FULL, allow_partials true, cron '@hourly', + start '2023-01-01', ); SELECT @execution_ts AS execution_ts