From b8b983ad8b31f33bb1e689b6c335641c1cf3345a Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Wed, 8 Oct 2025 21:48:42 -0700 Subject: [PATCH 1/2] fix: ensure _get_data_objects respects snapshots with different schema or catalog --- sqlmesh/core/snapshot/evaluator.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index f7aea5cff1..cbf0266659 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1593,14 +1593,14 @@ def _get_data_objects( tables_by_gateway_and_schema: t.Dict[t.Union[str, None], t.Dict[exp.Table, set[str]]] = ( defaultdict(lambda: defaultdict(set)) ) - snapshots_by_table_name: t.Dict[str, Snapshot] = {} + snapshots_by_table_name: t.Dict[t.Tuple[str, str, str], Snapshot] = {} for snapshot in target_snapshots: if not snapshot.is_model or snapshot.is_symbolic: continue table = table_name_callable(snapshot) table_schema = d.schema_(table.db, catalog=table.catalog) tables_by_gateway_and_schema[snapshot.model_gateway][table_schema].add(table.name) - snapshots_by_table_name[table.name] = snapshot + snapshots_by_table_name[(table.catalog, table.db, table.name)] = snapshot def _get_data_objects_in_schema( schema: exp.Table, @@ -1629,7 +1629,10 @@ def _get_data_objects_in_schema( ] existing_objects.extend(objs_for_gateway) - return {snapshots_by_table_name[obj.name].snapshot_id: obj for obj in existing_objects} + return { + snapshots_by_table_name[(obj.catalog or "", obj.schema_name, obj.name)].snapshot_id: obj + for obj in existing_objects + } def _evaluation_strategy(snapshot: SnapshotInfoLike, adapter: EngineAdapter) -> EvaluationStrategy: From 62acc540e78fad224b74c9230184db9c2dfc6c45 Mon Sep 17 00:00:00 2001 From: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> Date: Thu, 16 Oct 2025 12:26:18 +0300 Subject: [PATCH 2/2] simplify lookup; adapt tests --- sqlmesh/core/snapshot/evaluator.py | 37 +++++++++++++-------------- tests/core/test_snapshot_evaluator.py | 16 ++++++------ 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 4ea90dc5d7..762e4b91ec 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1593,14 +1593,14 @@ def _get_data_objects( tables_by_gateway_and_schema: t.Dict[t.Union[str, None], t.Dict[exp.Table, set[str]]] = ( defaultdict(lambda: defaultdict(set)) ) - snapshots_by_table_name: t.Dict[t.Tuple[str, str, str], Snapshot] = {} + snapshots_by_table_name: t.Dict[exp.Table, t.Dict[str, Snapshot]] = defaultdict(dict) for snapshot in target_snapshots: if not snapshot.is_model or snapshot.is_symbolic: continue table = table_name_callable(snapshot) table_schema = d.schema_(table.db, catalog=table.catalog) tables_by_gateway_and_schema[snapshot.model_gateway][table_schema].add(table.name) - snapshots_by_table_name[(table.catalog, table.db, table.name)] = snapshot + snapshots_by_table_name[table_schema][table.name] = snapshot def _get_data_objects_in_schema( schema: exp.Table, @@ -1613,26 +1613,25 @@ def _get_data_objects_in_schema( ) with self.concurrent_context(): - existing_objects: t.List[DataObject] = [] + snapshot_id_to_obj: t.Dict[SnapshotId, DataObject] = {} # A schema can be shared across multiple engines, so we need to group tables by both gateway and schema for gateway, tables_by_schema in tables_by_gateway_and_schema.items(): - objs_for_gateway = [ - obj - for objs in concurrent_apply_to_values( - list(tables_by_schema), - lambda s: _get_data_objects_in_schema( - schema=s, object_names=tables_by_schema.get(s), gateway=gateway - ), - self.ddl_concurrent_tasks, - ) - for obj in objs - ] - existing_objects.extend(objs_for_gateway) + schema_list = list(tables_by_schema.keys()) + results = concurrent_apply_to_values( + schema_list, + lambda s: _get_data_objects_in_schema( + schema=s, object_names=tables_by_schema.get(s), gateway=gateway + ), + self.ddl_concurrent_tasks, + ) - return { - snapshots_by_table_name[(obj.catalog or "", obj.schema_name, obj.name)].snapshot_id: obj - for obj in existing_objects - } + for schema, objs in zip(schema_list, results): + snapshots_by_name = snapshots_by_table_name.get(schema, {}) + for obj in objs: + if obj.name in snapshots_by_name: + snapshot_id_to_obj[snapshots_by_name[obj.name].snapshot_id] = obj + + return snapshot_id_to_obj def _evaluation_strategy(snapshot: SnapshotInfoLike, adapter: EngineAdapter) -> EvaluationStrategy: diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 348bddc32b..3a3a1a9376 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -1418,7 +1418,7 @@ def columns(table_name): "get_data_objects", return_value=[ DataObject( - schema="test_schema", + schema="sqlmesh__test_schema", name=f"test_schema__test_model__{snapshot.version}", type="table", ) @@ -1500,7 +1500,7 @@ def test_migrate_view( "sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects", return_value=[ DataObject( - schema="test_schema", + schema="sqlmesh__test_schema", name=f"test_schema__test_model__{snapshot.version}", type="view", ) @@ -1950,7 +1950,7 @@ def columns(table_name): "sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects", return_value=[ DataObject( - schema="test_schema", + schema="sqlmesh__test_schema", name=f"test_schema__test_model__{snapshot.version}", type=DataObjectType.TABLE, ) @@ -2037,7 +2037,7 @@ def columns(table_name): "sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects", return_value=[ DataObject( - schema="test_schema", + schema="sqlmesh__test_schema", name=f"test_schema__test_model__{snapshot.version}", type=DataObjectType.TABLE, ) @@ -4016,7 +4016,7 @@ def test_migrate_snapshot(snapshot: Snapshot, mocker: MockerFixture, adapter_moc adapter_mock.get_data_objects.return_value = [ DataObject( - schema="test_schema", + schema="sqlmesh__db", name=f"db__model__{new_snapshot.version}", type=DataObjectType.TABLE, ) @@ -4154,7 +4154,7 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture): adapter_mock.get_data_objects.return_value = [ DataObject( - schema="test_schema", + schema="sqlmesh__test_schema", name=f"test_schema__test_model__{snapshot.version}", type=DataObjectType.MANAGED_TABLE, ) @@ -4380,12 +4380,12 @@ def columns(table_name): "sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects", return_value=[ DataObject( - schema="test_schema", + schema="sqlmesh__test_schema", name=f"test_schema__test_model__{snapshot_1.version}", type=DataObjectType.TABLE, ), DataObject( - schema="test_schema", + schema="sqlmesh__test_schema", name=f"test_schema__test_model_2__{snapshot_2.version}", type=DataObjectType.TABLE, ),