From 2352f35bb3b16d4939883a6a1d4c81ddcab043d4 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Sat, 28 Oct 2023 18:57:35 +0100 Subject: [PATCH] Don't get DAG out of DagBag when we already have it Two things here: 1. By the ponit we are looking at the "callbacks" `dagrun.dag` will already be set, (the `or dagbag.get_dag` is a safety precaution. It might not be required or worth it) 2. DagBag already _is_ a cache. We don't need an extra caching layer on top of it. This "soft reverts" #30704 and removes the lru_cache --- airflow/jobs/scheduler_job_runner.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 4eebf9967cd2d..ae126fb720061 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -27,9 +27,8 @@ from collections import Counter from dataclasses import dataclass from datetime import timedelta -from functools import lru_cache, partial from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator +from typing import TYPE_CHECKING, Any, Collection, Iterable, Iterator from sqlalchemy import and_, delete, func, not_, or_, select, text, update from sqlalchemy.exc import OperationalError @@ -1064,12 +1063,8 @@ def _do_scheduling(self, session: Session) -> int: callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) # Send the callbacks after we commit to ensure the context is up to date when it gets run - # cache saves time during scheduling of many dag_runs for same dag - cached_get_dag: Callable[[str], DAG | None] = lru_cache()( - partial(self.dagbag.get_dag, session=session) - ) for dag_run, callback_to_run in callback_tuples: - dag = cached_get_dag(dag_run.dag_id) + dag = dag_run.dag or self.dagbag.get_dag(dag_run.dag_id, session=session) if dag: # Sending callbacks there as in standalone_dag_processor they are adding to the database, # so it must be done outside of prohibit_commit. @@ -1373,13 +1368,8 @@ def _update_state(dag: DAG, dag_run: DagRun): tags={"dag_id": dag.dag_id}, ) - # cache saves time during scheduling of many dag_runs for same dag - cached_get_dag: Callable[[str], DAG | None] = lru_cache()( - partial(self.dagbag.get_dag, session=session) - ) - for dag_run in dag_runs: - dag = dag_run.dag = cached_get_dag(dag_run.dag_id) + dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) if not dag: self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)