From a4599b4c936ee64a3d759db9f084ed86ce865e45 Mon Sep 17 00:00:00 2001 From: Josh Owen Date: Tue, 8 Aug 2023 21:14:32 -0400 Subject: [PATCH 1/8] Index optimized fast path to avoid more complicated & slower groupby queryplan --- airflow/models/dag.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index beb7fe013152f..ca6d5c5785d39 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2935,15 +2935,26 @@ def bulk_write_to_db( # Skip these queries entirely if no DAGs can be scheduled to save time. if any(dag.timetable.can_be_scheduled for dag in dags): # Get the latest dag run for each existing dag as a single query (avoid n+1 query) - most_recent_subq = ( - select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) - .where( - DagRun.dag_id.in_(existing_dags), - or_(DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED), + if len(existing_dags) == 1: + # Index optimized fast path to avoid more complicated & slower groupby queryplan + most_recent_subq = ( + select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) + .where( + DagRun.dag_id == existing_dags[0], + or_(DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED), + ) + .subquery() + ) + else: + most_recent_subq = ( + select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) + .where( + DagRun.dag_id.in_(existing_dags), + or_(DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED), + ) + .group_by(DagRun.dag_id) + .subquery() ) - .group_by(DagRun.dag_id) - .subquery() - ) most_recent_runs_iter = session.scalars( select(DagRun).where( DagRun.dag_id == most_recent_subq.c.dag_id, From 59f5bac9265587555db05a31b30f7c37f0b13d7c Mon Sep 17 00:00:00 2001 From: Josh Owen Date: Tue, 8 Aug 2023 21:20:02 -0400 Subject: [PATCH 2/8] Update dag.py --- airflow/models/dag.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index ca6d5c5785d39..3e5fd950beb3e 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2938,13 +2938,19 @@ def bulk_write_to_db( if len(existing_dags) == 1: # Index optimized fast path to avoid more complicated & slower groupby queryplan most_recent_subq = ( - select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) + select(func.max(DagRun.execution_date).label("max_execution_date")) .where( DagRun.dag_id == existing_dags[0], or_(DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED), ) .subquery() ) + most_recent_runs_iter = session.scalars( + select(DagRun).where( + DagRun.dag_id == existing_dags[0], + DagRun.execution_date == most_recent_subq.c.max_execution_date, + ) + ) else: most_recent_subq = ( select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) @@ -2954,13 +2960,13 @@ def bulk_write_to_db( ) .group_by(DagRun.dag_id) .subquery() + ) + most_recent_runs_iter = session.scalars( + select(DagRun).where( + DagRun.dag_id == most_recent_subq.c.dag_id, + DagRun.execution_date == most_recent_subq.c.max_execution_date, + ) ) - most_recent_runs_iter = session.scalars( - select(DagRun).where( - DagRun.dag_id == most_recent_subq.c.dag_id, - DagRun.execution_date == most_recent_subq.c.max_execution_date, - ) - ) most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter} # Get number of active dagruns for all dags we are processing as a single query. From 166a57b26b7f5bf094070463ec9a40babc96463f Mon Sep 17 00:00:00 2001 From: Josh Owen Date: Wed, 9 Aug 2023 10:34:17 -0400 Subject: [PATCH 3/8] lint --- airflow/models/dag.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 3e5fd950beb3e..eb996b5da8f47 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2941,7 +2941,10 @@ def bulk_write_to_db( select(func.max(DagRun.execution_date).label("max_execution_date")) .where( DagRun.dag_id == existing_dags[0], - or_(DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED), + or_( + DagRun.run_type == DagRunType.BACKFILL_JOB, + DagRun.run_type == DagRunType.SCHEDULED, + ), ) .subquery() ) @@ -2956,11 +2959,14 @@ def bulk_write_to_db( select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) .where( DagRun.dag_id.in_(existing_dags), - or_(DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED), + or_( + DagRun.run_type == DagRunType.BACKFILL_JOB, + DagRun.run_type == DagRunType.SCHEDULED, + ), ) .group_by(DagRun.dag_id) .subquery() - ) + ) most_recent_runs_iter = session.scalars( select(DagRun).where( DagRun.dag_id == most_recent_subq.c.dag_id, From e2a92be63563a416ec6766935532369b93067f5f Mon Sep 17 00:00:00 2001 From: Josh Owen Date: Tue, 15 Aug 2023 05:37:08 -0400 Subject: [PATCH 4/8] fix --- airflow/models/dag.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index eb996b5da8f47..75394788fbcf1 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2937,10 +2937,11 @@ def bulk_write_to_db( # Get the latest dag run for each existing dag as a single query (avoid n+1 query) if len(existing_dags) == 1: # Index optimized fast path to avoid more complicated & slower groupby queryplan + existing_dag_id = list(existing_dags)[0] most_recent_subq = ( select(func.max(DagRun.execution_date).label("max_execution_date")) .where( - DagRun.dag_id == existing_dags[0], + DagRun.dag_id == existing_dag_id, or_( DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED, @@ -2950,7 +2951,7 @@ def bulk_write_to_db( ) most_recent_runs_iter = session.scalars( select(DagRun).where( - DagRun.dag_id == existing_dags[0], + DagRun.dag_id == existing_dag_id, DagRun.execution_date == most_recent_subq.c.max_execution_date, ) ) From 832000be97498bad2a7a38b5a6e424a85e268569 Mon Sep 17 00:00:00 2001 From: Josh Owen Date: Mon, 18 Sep 2023 09:24:44 -0400 Subject: [PATCH 5/8] Update dag.py --- airflow/models/dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 967b8cfec0848..534b24c7b3044 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2985,7 +2985,7 @@ def bulk_write_to_db( DagRun.execution_date == last_automated_runs_subq.c.max_execution_date, ) ) - dag_id_to_last_automated_run = {run.dag_id: run for run in most_recent_runs_iter} + dag_id_to_last_automated_run = {run.dag_id: run for run in last_automated_runs} # Get number of active dagruns for all dags we are processing as a single query. num_active_runs = DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session) From 45586a485180b3be6e6e4e4b1fe6d122b636c4e0 Mon Sep 17 00:00:00 2001 From: Josh Owen Date: Mon, 18 Sep 2023 11:07:49 -0400 Subject: [PATCH 6/8] added tests --- tests/models/test_dag.py | 53 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index cf84577b0b234..f122cda8f9d74 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -935,6 +935,59 @@ def test_bulk_write_to_db(self): for row in session.query(DagModel.last_parsed_time).all(): assert row[0] is not None + def test_bulk_write_to_db_single_dag(self): + """ + Test bulk_write_to_db for a single dag using the index optimized query + """ + clear_db_dags() + dags = [DAG(f"dag-bulk-sync-{i}", start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(1)] + + with assert_queries_count(5): + DAG.bulk_write_to_db(dags) + with create_session() as session: + assert {"dag-bulk-sync-0"} == {row[0] for row in session.query(DagModel.dag_id).all()} + assert { + ("dag-bulk-sync-0", "test-dag"), + } == set(session.query(DagTag.dag_id, DagTag.name).all()) + + for row in session.query(DagModel.last_parsed_time).all(): + assert row[0] is not None + + # Re-sync should do fewer queries + with assert_queries_count(8): + DAG.bulk_write_to_db(dags) + with assert_queries_count(8): + DAG.bulk_write_to_db(dags) + + def test_bulk_write_to_db_multiple_dags(self): + """ + Test bulk_write_to_db for multiple dags which does not use the index optimized query + """ + clear_db_dags() + dags = [DAG(f"dag-bulk-sync-{i}", start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(4)] + + with assert_queries_count(5): + DAG.bulk_write_to_db(dags) + with create_session() as session: + assert {"dag-bulk-sync-0", "dag-bulk-sync-1", "dag-bulk-sync-2", "dag-bulk-sync-3"} == { + row[0] for row in session.query(DagModel.dag_id).all() + } + assert { + ("dag-bulk-sync-0", "test-dag"), + ("dag-bulk-sync-1", "test-dag"), + ("dag-bulk-sync-2", "test-dag"), + ("dag-bulk-sync-3", "test-dag"), + } == set(session.query(DagTag.dag_id, DagTag.name).all()) + + for row in session.query(DagModel.last_parsed_time).all(): + assert row[0] is not None + + # Re-sync should do fewer queries + with assert_queries_count(8): + DAG.bulk_write_to_db(dags) + with assert_queries_count(8): + DAG.bulk_write_to_db(dags) + @pytest.mark.parametrize("interval", [None, "@daily"]) def test_bulk_write_to_db_interval_save_runtime(self, interval): mock_active_runs_of_dags = mock.MagicMock(side_effect=DagRun.active_runs_of_dags) From e85929c8df915093b25fefdfb2b69fd0d3179e63 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 17 Jan 2024 10:13:04 -0800 Subject: [PATCH 7/8] fix bug; improve queries; move to method --- airflow/models/dag.py | 92 +++++++++++++++++++++------------------- tests/models/test_dag.py | 33 ++++++++++++++ 2 files changed, 82 insertions(+), 43 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 5a5602caad8fe..da29ed984241f 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -73,7 +73,7 @@ update, ) from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.orm import backref, joinedload, relationship +from sqlalchemy.orm import backref, joinedload, load_only, relationship from sqlalchemy.sql import Select, expression import airflow.templates @@ -3063,51 +3063,13 @@ def bulk_write_to_db( session.add(orm_dag) orm_dags.append(orm_dag) - dag_id_to_last_automated_run: dict[str, DagRun] = {} + latest_runs: dict[str, DagRun] = {} num_active_runs: dict[str, int] = {} # Skip these queries entirely if no DAGs can be scheduled to save time. if any(dag.timetable.can_be_scheduled for dag in dags): # Get the latest automated dag run for each existing dag as a single query (avoid n+1 query) - if len(existing_dags) == 1: - # Index optimized fast path to avoid more complicated & slower groupby queryplan - existing_dag_id = list(existing_dags)[0] - last_automated_runs_subq = ( - select(func.max(DagRun.execution_date).label("max_execution_date")) - .where( - DagRun.dag_id == existing_dag_id, - or_( - DagRun.run_type == DagRunType.BACKFILL_JOB, - DagRun.run_type == DagRunType.SCHEDULED, - ), - ) - .subquery() - ) - last_automated_runs = session.scalars( - select(DagRun).where( - DagRun.dag_id == existing_dag_id, - DagRun.execution_date == last_automated_runs_subq.c.max_execution_date, - ) - ) - else: - last_automated_runs_subq = ( - select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) - .where( - DagRun.dag_id.in_(existing_dags), - or_( - DagRun.run_type == DagRunType.BACKFILL_JOB, - DagRun.run_type == DagRunType.SCHEDULED, - ), - ) - .group_by(DagRun.dag_id) - .subquery() - ) - last_automated_runs = session.scalars( - select(DagRun).where( - DagRun.dag_id == last_automated_runs_subq.c.dag_id, - DagRun.execution_date == last_automated_runs_subq.c.max_execution_date, - ) - ) - dag_id_to_last_automated_run = {run.dag_id: run for run in last_automated_runs} + query = cls._get_latest_runs_query(existing_dags, session) + latest_runs = {run.dag_id: run for run in session.scalars(query)} # Get number of active dagruns for all dags we are processing as a single query. num_active_runs = DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session) @@ -3141,7 +3103,7 @@ def bulk_write_to_db( orm_dag.timetable_description = dag.timetable.description orm_dag.processor_subdir = processor_subdir - last_automated_run: DagRun | None = dag_id_to_last_automated_run.get(dag.dag_id) + last_automated_run: DagRun | None = latest_runs.get(dag.dag_id) if last_automated_run is None: last_automated_data_interval = None else: @@ -3278,6 +3240,50 @@ def bulk_write_to_db( for dag in dags: cls.bulk_write_to_db(dag.subdags, processor_subdir=processor_subdir, session=session) + @classmethod + def _get_latest_runs_query(cls, dags, session) -> Query: + """ + Query the database to retrieve the last automated run for each dag. + + :param dags: dags to query + :param session: sqlalchemy session object + """ + load_only_option = load_only( + DagRun.dag_id, + DagRun.execution_date, + DagRun.data_interval_start, + DagRun.data_interval_end, + ) + if len(dags) == 1: + # Index optimized fast path to avoid more complicated & slower groupby queryplan + existing_dag_id = list(dags)[0].dag_id + last_automated_runs_subq = ( + select(func.max(DagRun.execution_date).label("max_execution_date")) + .where( + DagRun.dag_id == existing_dag_id, + DagRun.run_type.in_((DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED)), + ) + .subquery() + ) + query = select(DagRun).where( + DagRun.dag_id == existing_dag_id, DagRun.execution_date == last_automated_runs_subq + ) + else: + last_automated_runs_subq = ( + select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) + .where( + DagRun.dag_id.in_(dags), + DagRun.run_type.in_((DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED)), + ) + .group_by(DagRun.dag_id) + .subquery() + ) + query = select(DagRun).where( + DagRun.dag_id == last_automated_runs_subq.c.dag_id, + DagRun.execution_date == last_automated_runs_subq.c.max_execution_date, + ) + return query.options(load_only_option) + @provide_session def sync_to_db(self, processor_subdir: str | None = None, session=NEW_SESSION): """ diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index f50684e9cb467..7c337ed965c28 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -4135,3 +4135,36 @@ def test_validate_setup_teardown_trigger_rule(self): Exception, match="Setup tasks must be followed with trigger rule ALL_SUCCESS." ): dag.validate_setup_teardown() + + +def test_get_latest_runs_query_one_dag(dag_maker, session): + with dag_maker(dag_id="dag1") as dag1: + ... + query = DAG._get_latest_runs_query(dags=[dag1], session=session) + actual = [x.strip() for x in str(query.compile()).splitlines()] + expected = [ + "SELECT dag_run.id, dag_run.dag_id, dag_run.execution_date, dag_run.data_interval_start, dag_run.data_interval_end", + "FROM dag_run", + "WHERE dag_run.dag_id = :dag_id_1 AND dag_run.execution_date = (SELECT max(dag_run.execution_date) AS max_execution_date", + "FROM dag_run", + "WHERE dag_run.dag_id = :dag_id_2 AND dag_run.run_type IN (__[POSTCOMPILE_run_type_1]))", + ] + assert actual == expected + + +def test_get_latest_runs_query_two_dags(dag_maker, session): + with dag_maker(dag_id="dag1") as dag1: + ... + with dag_maker(dag_id="dag2") as dag2: + ... + query = DAG._get_latest_runs_query(dags=[dag1, dag2], session=session) + actual = [x.strip() for x in str(query.compile()).splitlines()] + print("\n".join(actual)) + expected = [ + "SELECT dag_run.id, dag_run.dag_id, dag_run.execution_date, dag_run.data_interval_start, dag_run.data_interval_end", + "FROM dag_run, (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS max_execution_date", + "FROM dag_run", + "WHERE dag_run.dag_id IN (__[POSTCOMPILE_dag_id_1]) AND dag_run.run_type IN (__[POSTCOMPILE_run_type_1]) GROUP BY dag_run.dag_id) AS anon_1", + "WHERE dag_run.dag_id = anon_1.dag_id AND dag_run.execution_date = anon_1.max_execution_date", + ] + assert actual == expected From 324b3dd0e3a5444e159ef2d2be72f9d00372fedf Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 17 Jan 2024 10:59:28 -0800 Subject: [PATCH 8/8] remove var --- airflow/models/dag.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index da29ed984241f..622c5b2a11b95 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -3248,12 +3248,6 @@ def _get_latest_runs_query(cls, dags, session) -> Query: :param dags: dags to query :param session: sqlalchemy session object """ - load_only_option = load_only( - DagRun.dag_id, - DagRun.execution_date, - DagRun.data_interval_start, - DagRun.data_interval_end, - ) if len(dags) == 1: # Index optimized fast path to avoid more complicated & slower groupby queryplan existing_dag_id = list(dags)[0].dag_id @@ -3282,7 +3276,14 @@ def _get_latest_runs_query(cls, dags, session) -> Query: DagRun.dag_id == last_automated_runs_subq.c.dag_id, DagRun.execution_date == last_automated_runs_subq.c.max_execution_date, ) - return query.options(load_only_option) + return query.options( + load_only( + DagRun.dag_id, + DagRun.execution_date, + DagRun.data_interval_start, + DagRun.data_interval_end, + ) + ) @provide_session def sync_to_db(self, processor_subdir: str | None = None, session=NEW_SESSION):