From 59f088dd5eaf96c29ff99c8097036789a76871e7 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 28 May 2025 10:54:34 -0700 Subject: [PATCH] [v3-0-test] Fix get dags query to not have join explosion (#50984) Previously it was missing dag_id filter, but joining on start date would still be problematic. In this PR I refactor the query a bit so that all joins are guaranteed 1-1. To get "latest" DagRun I sort by the DagRun.id column. This is a simplifying assumption that would be more performant than sorting by start_date, since there could be more than 1 dag run with a given start date. (cherry picked from commit b994bb23a345ca33880cc2aad71d87be1d6e3612) Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- .../core_api/routes/public/dags.py | 43 +++++++++++++++---- .../core_api/routes/public/test_dags.py | 8 ++-- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py index 84cc2493fea74..f55c5763701ef 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py @@ -22,12 +22,13 @@ from fastapi import Depends, HTTPException, Query, Response, status from fastapi.exceptions import RequestValidationError from pydantic import ValidationError -from sqlalchemy import select, update +from sqlalchemy import func, null, select, update from airflow.api.common import delete_dag as delete_dag_module from airflow.api_fastapi.common.dagbag import DagBagDep from airflow.api_fastapi.common.db.common import ( SessionDep, + apply_filters_to_select, paginated_select, ) from airflow.api_fastapi.common.db.dags import generate_dag_with_latest_run_query @@ -115,22 +116,47 @@ def get_dags( session: SessionDep, ) -> DAGCollectionResponse: """Get all DAGs.""" - dag_runs_select = None + query = select(DagModel) - if dag_run_state.value or dag_run_start_date_range.is_active() or dag_run_end_date_range.is_active(): - dag_runs_select, _ = paginated_select( - statement=select(DagRun), + max_run_id_query = ( # ordering by id will not always be "latest run", but it's a simplifying assumption + select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id")) + .where(DagRun.start_date.is_not(null())) + .group_by(DagRun.dag_id) + .subquery(name="mrq") + ) + + has_max_run_filter = ( + dag_run_state.value + or last_dag_run_state.value + or dag_run_start_date_range.is_active() + or dag_run_end_date_range.is_active() + ) + + if has_max_run_filter or order_by.value in ( + "last_run_state", + "last_run_start_date", + "-last_run_state", + "-last_run_start_date", + ): + query = query.join( + max_run_id_query, + DagModel.dag_id == max_run_id_query.c.dag_id, + isouter=True, + ).join(DagRun, DagRun.id == max_run_id_query.c.max_dag_run_id, isouter=True) + + if has_max_run_filter: + query = apply_filters_to_select( + statement=query, filters=[ dag_run_start_date_range, dag_run_end_date_range, dag_run_state, + last_dag_run_state, ], - session=session, ) - dag_runs_select = dag_runs_select.cte() dags_select, total_entries = paginated_select( - statement=generate_dag_with_latest_run_query(dag_runs_select), + statement=query, filters=[ exclude_stale, paused, @@ -138,7 +164,6 @@ def get_dags( dag_display_name_pattern, tags, owners, - last_dag_run_state, readable_dags_filter, ], order_by=order_by, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py index def4656f850ba..8e7ee00b83553 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py @@ -162,7 +162,7 @@ class TestGetDags(TestDagEndpoint): ({"last_dag_run_state": "success", "exclude_stale": False}, 1, [DAG3_ID]), ({"last_dag_run_state": "failed", "exclude_stale": False}, 1, [DAG1_ID]), ({"dag_run_state": "failed"}, 1, [DAG1_ID]), - ({"dag_run_state": "failed", "exclude_stale": False}, 2, [DAG1_ID, DAG3_ID]), + ({"dag_run_state": "failed", "exclude_stale": False}, 1, [DAG1_ID]), ( {"dag_run_start_date_gte": DAG3_START_DATE_2.isoformat(), "exclude_stale": False}, 1, @@ -210,10 +210,10 @@ class TestGetDags(TestDagEndpoint): "dag_run_state": "failed", "exclude_stale": False, }, - 1, - [DAG3_ID], + 0, + [], ), - # # Sort + # Sort ({"order_by": "-dag_id"}, 2, [DAG2_ID, DAG1_ID]), ({"order_by": "-dag_display_name"}, 2, [DAG2_ID, DAG1_ID]), ({"order_by": "dag_display_name"}, 2, [DAG1_ID, DAG2_ID]),