diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 4eebf9967cd2d..ae126fb720061 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -27,9 +27,8 @@ from collections import Counter from dataclasses import dataclass from datetime import timedelta -from functools import lru_cache, partial from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator +from typing import TYPE_CHECKING, Any, Collection, Iterable, Iterator from sqlalchemy import and_, delete, func, not_, or_, select, text, update from sqlalchemy.exc import OperationalError @@ -1064,12 +1063,8 @@ def _do_scheduling(self, session: Session) -> int: callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) # Send the callbacks after we commit to ensure the context is up to date when it gets run - # cache saves time during scheduling of many dag_runs for same dag - cached_get_dag: Callable[[str], DAG | None] = lru_cache()( - partial(self.dagbag.get_dag, session=session) - ) for dag_run, callback_to_run in callback_tuples: - dag = cached_get_dag(dag_run.dag_id) + dag = dag_run.dag or self.dagbag.get_dag(dag_run.dag_id, session=session) if dag: # Sending callbacks there as in standalone_dag_processor they are adding to the database, # so it must be done outside of prohibit_commit. @@ -1373,13 +1368,8 @@ def _update_state(dag: DAG, dag_run: DagRun): tags={"dag_id": dag.dag_id}, ) - # cache saves time during scheduling of many dag_runs for same dag - cached_get_dag: Callable[[str], DAG | None] = lru_cache()( - partial(self.dagbag.get_dag, session=session) - ) - for dag_run in dag_runs: - dag = dag_run.dag = cached_get_dag(dag_run.dag_id) + dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) if not dag: self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)