diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index 7e27205fc6..de770079d0 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -538,6 +538,10 @@ def run_node(node: SchedulingUnit) -> None: execution_time=execution_time, ) else: + # If batch_index > 0, then the target table must exist since the first batch would have created it + target_table_exists = ( + snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0 + ) audit_results = self.evaluate( snapshot=snapshot, environment_naming_info=environment_naming_info, @@ -548,7 +552,7 @@ def run_node(node: SchedulingUnit) -> None: batch_index=node.batch_index, allow_destructive_snapshots=allow_destructive_snapshots, allow_additive_snapshots=allow_additive_snapshots, - target_table_exists=snapshot.snapshot_id not in snapshots_to_create, + target_table_exists=target_table_exists, selected_models=selected_models, ) diff --git a/tests/core/integration/test_change_scenarios.py b/tests/core/integration/test_change_scenarios.py index 816f41afe6..fb1762220f 100644 --- a/tests/core/integration/test_change_scenarios.py +++ b/tests/core/integration/test_change_scenarios.py @@ -1482,3 +1482,36 @@ def test_annotated_self_referential_model(init_and_plan_context: t.Callable): df = context.fetchdf("SELECT one FROM memory.sushi.test_self_ref") assert len(df) == 0 + + +@time_machine.travel("2023-01-08 00:00:00 UTC") +def test_creating_stage_for_first_batch_only(init_and_plan_context: t.Callable): + context, _ = init_and_plan_context("examples/sushi") + + expressions = d.parse( + """ + MODEL ( + name memory.sushi.test_batch_size, + kind INCREMENTAL_BY_UNIQUE_KEY ( + unique_key one, + batch_size 1, + ), + + start '2023-01-01', + ); + + CREATE SCHEMA IF NOT EXISTS test_schema; + CREATE TABLE IF NOT EXISTS test_schema.creating_counter (a INT); + + SELECT 1::INT AS one; + + @IF(@runtime_stage = 'creating', INSERT INTO test_schema.creating_counter (a) VALUES (1)); + """ + ) + model = load_sql_based_model(expressions) + context.upsert_model(model) + + context.plan("prod", skip_tests=True, no_prompts=True, auto_apply=True) + assert ( + context.engine_adapter.fetchone("SELECT COUNT(*) FROM test_schema.creating_counter")[0] == 1 + )