Skip to content

runtime_stage creating appears twice during plan application for new models #5555

@PiotrKatvitski

Description

@PiotrKatvitski

SQLMesh version is 0.224.0.

Consider the following model:

    owner me,
    kind INCREMENTAL_BY_UNIQUE_KEY(
        unique_key (key_col),
        batch_size 1
    ),
    start '2025-10-12',
    cron '0 0 * * *'
);

SELECT @start_ts::varchar(20) as text_col
     , 1 as key_col;

@IF(@runtime_stage = 'creating',
    @init_test()
)

The macro init_test is expected to be run only once during table creation. Here's an example of such macro for table initialization:

import typing as t

from sqlmesh import macro


if t.TYPE_CHECKING:
    from sqlmesh.core.macros import MacroEvaluator

@macro()
def init_test(
    evaluator: "MacroEvaluator",
) -> list:
    this_model = evaluator.this_model
    print(f"Model {this_model}, stage {evaluator.runtime_stage}")
    if evaluator.runtime_stage != "creating":
        return []
    return [
        f"""
    MERGE INTO {this_model}
    USING (SELECT current_timestamp::VARCHAR(20) as text_col, 2 as key_col) old_
    ON {this_model}.key_col = old_.key_col
    WHEN MATCHED THEN UPDATE SET key_col = old_.key_col
    WHEN NOT MATCHED THEN INSERT ("text_col", "key_col") VALUES (old_.text_col, old_.key_col)
"""
    ]

What actually happens is that the macro runs twice. See the following log snippet from sqlmesh plan dev_piotr command:

........
25-10-14 13:43:30,327 - ThreadPoolExecutor-3_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /* SQLMESH_PLAN: c0f2afe77932458c931630b75607bcf1 */ CREATE TABLE "sqlmesh__test"."test__test_model__2273005933" AS SELECT CAST('2025-10-12 00:00:00' AS VARCHAR(20)) AS "text_col", 1 AS "key_col" (base.py:2507)
------> 2025-10-14 13:43:31,511 - ThreadPoolExecutor-3_0 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /* SQLMESH_PLAN: c0f2afe77932458c931630b75607bcf1 */ MERGE INTO "dwh"."sqlmesh__test"."test__test_model__2273005933" USING (SELECT CAST(GETDATE() AS VARCHAR(20)) AS "text_col", 2 AS "key_col") AS "old_" ON "dwh"."sqlmesh__test"."test__test_model__2273005933"."key_col" = "old_"."key_col" WHEN MATCHED THEN UPDATE SET "key_col" = "old_"."key_col" WHEN NOT MATCHED THEN INSERT ("text_col", "key_col") VALUES ("old_"."text_col", "old_"."key_col") (base.py:2507)
2025-10-14 13:43:54,459 - ThreadPoolExecutor-3_0 - sqlmesh.core.state_sync.db.facade - INFO - Adding interval (2025-10-12 00:00:00, 2025-10-13 00:00:00) for snapshot SnapshotId<"dwh"."test"."test_model": 2436650494> (facade.py:624)
2025-10-14 13:43:54,464 - ThreadPoolExecutor-3_0 - sqlmesh.core.state_sync.db.interval - INFO - Pushing intervals for snapshot SnapshotId<"dwh"."test"."test_model": 2436650494> (interval.py:215)
2025-10-14 13:43:54,737 - ThreadPoolExecutor-3_1 - sqlmesh.core.snapshot.evaluator - INFO - Evaluating snapshot SnapshotId<"dwh"."test"."test_model": 2436650494> (evaluator.py:704)
2025-10-14 13:43:55,693 - ThreadPoolExecutor-3_1 - sqlmesh.core.snapshot.evaluator - INFO - Inserting data for snapshot SnapshotId<"dwh"."test"."test_model": 2436650494> (evaluator.py:919)
2025-10-14 13:43:55,698 - ThreadPoolExecutor-3_1 - sqlmesh.core.snapshot.evaluator - INFO - Inserting batch (2025-10-13 00:00:00, 2025-10-14 00:00:00) into dwh.sqlmesh__test.test__test_model__2273005933' (evaluator.py:949)
2025-10-14 13:43:55,701 - ThreadPoolExecutor-3_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /* SQLMESH_PLAN: c0f2afe77932458c931630b75607bcf1 */ MERGE INTO "sqlmesh__test"."test__test_model__2273005933" USING (SELECT CAST('2025-10-13 00:00:00' AS VARCHAR(20)) AS "text_col", 1 AS "key_col") AS "__MERGE_SOURCE__" ON "sqlmesh__test"."test__test_model__2273005933"."key_col" = "__MERGE_SOURCE__"."key_col" WHEN MATCHED THEN UPDATE SET "text_col" = "__MERGE_SOURCE__"."text_col", "key_col" = "__MERGE_SOURCE__"."key_col" WHEN NOT MATCHED THEN INSERT ("text_col", "key_col") VALUES ("__MERGE_SOURCE__"."text_col", "__MERGE_SOURCE__"."key_col") (base.py:2507)
------> 2025-10-14 13:44:07,183 - ThreadPoolExecutor-3_1 - sqlmesh.core.engine_adapter.base - INFO - Executing SQL: /* SQLMESH_PLAN: c0f2afe77932458c931630b75607bcf1 */ MERGE INTO "dwh"."sqlmesh__test"."test__test_model__2273005933" USING (SELECT CAST(GETDATE() AS VARCHAR(20)) AS "text_col", 2 AS "key_col") AS "old_" ON "dwh"."sqlmesh__test"."test__test_model__2273005933"."key_col" = "old_"."key_col" WHEN MATCHED THEN UPDATE SET "key_col" = "old_"."key_col" WHEN NOT MATCHED THEN INSERT ("text_col", "key_col") VALUES ("old_"."text_col", "old_"."key_col") (base.py:2507)
...........

The rows in the log that I indicated with ------> show that the macro is being executed twice in this test case, even though the model is only created once.

Metadata

Metadata

Assignees

Labels

BugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions