From 9681561d965af375a1bc9a0d67eacbcc871d0f03 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 29 Jul 2025 03:09:33 +0530 Subject: [PATCH] Fix scheduler heartbeat timeout failures with ``DetachedInstanceError`` Resolves `DetachedInstanceError` when scheduler processes task instances that have timed out during heartbeat detection. The error occurred when Pydantic validation of `TIRunContext` attempted to access the consumed_asset_events relationship on `DagRun` objects that had been detached from the `SQLAlchemy` session. Root cause: The main scheduler loop calls `session.expunge_all()` which detaches all objects from the session. Later, when processing heartbeat timeouts, the scheduler creates `TIRunContext` objects that trigger Pydantic validation of `dag_run.consumed_asset_events`, causing `DetachedInstanceError` on the lazy-loaded relationship. Solution: Add `selectinload(DagRun.consumed_asset_events)` to the heartbeat timeout query to eagerly load the relationship before objects are detached. This minimal fix loads only the required relationship without over-eager loading of nested fields that aren't accessed during heartbeat processing. The fix affects all DAG types since consumed_asset_events is initialized as an empty list on all DagRun objects, not just asset-triggered DAGs. Longer term using `back_populates` (with `lazy="selectin"`) might be better so we don't need to remember this: https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html https://docs.sqlalchemy.org/en/20/orm/relationship_api.html#sqlalchemy.orm.relationship.params.back_populates --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index cb1c69fc7e624..e3411d4241773 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2264,7 +2264,7 @@ def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[T task_instances_without_heartbeats = session.scalars( select(TI) .options(selectinload(TI.dag_model)) - .options(selectinload(TI.dag_run)) + .options(selectinload(TI.dag_run).selectinload(DagRun.consumed_asset_events)) .options(selectinload(TI.dag_version)) .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql") .join(DM, TI.dag_id == DM.dag_id)