diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 775ac759ffb64..128a82d7eedb3 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -28,9 +28,8 @@ from collections import Counter from dataclasses import dataclass from datetime import datetime, timedelta -from functools import lru_cache, partial from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator +from typing import TYPE_CHECKING, Any, Collection, Iterable, Iterator from sqlalchemy import and_, func, not_, or_, text from sqlalchemy.exc import OperationalError @@ -1053,13 +1052,8 @@ def _do_scheduling(self, session: Session) -> int: callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) # Send the callbacks after we commit to ensure the context is up to date when it gets run - # cache saves time during scheduling of many dag_runs for same dag - cached_get_dag: Callable[[str], DAG | None] = lru_cache()( - partial(self.dagbag.get_dag, session=session) - ) for dag_run, callback_to_run in callback_tuples: - dag = cached_get_dag(dag_run.dag_id) - + dag = self.dagbag.get_dag(dag_run.dag_id, session=session) if not dag: self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id) continue @@ -1323,14 +1317,8 @@ def _update_state(dag: DAG, dag_run: DagRun): tags={"dag_id": dag.dag_id}, ) - # cache saves time during scheduling of many dag_runs for same dag - cached_get_dag: Callable[[str], DAG | None] = lru_cache()( - partial(self.dagbag.get_dag, session=session) - ) - for dag_run in dag_runs: - dag = dag_run.dag = cached_get_dag(dag_run.dag_id) - + dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) if not dag: self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id) continue