From 5ce419f3c91164f7f7d3c443a65eaeb197cc88f0 Mon Sep 17 00:00:00 2001 From: Dheeraj Turaga Date: Sun, 17 Aug 2025 16:12:39 -0500 Subject: [PATCH] Fix task_queued_timeout not working after first DAG run Reset queued_by_job_id when rescheduling stuck tasks to ensure timeout monitoring works correctly across multiple DAG runs. --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 85d743a12301c..6e5011e8685c4 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2055,6 +2055,7 @@ def _reschedule_stuck_task(self, ti: TaskInstance, session: Session): .values( state=TaskInstanceState.SCHEDULED, queued_dttm=None, + queued_by_job_id=None, scheduled_dttm=timezone.utcnow(), ) .execution_options(synchronize_session=False)