From dd67bf5933a82cf74d672cc63e76884088220b24 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 29 May 2025 14:32:11 -0700 Subject: [PATCH] Remove filtering by last dag run state in patch dags endpoint (#51176) I don't think this is used. The query is complicated so let's just remove it. Meanwhile, it's a bit weird to have an endpoint with pagination that filters over a mutable characteristic -- such as last run state. (cherry picked from commit 42003357cc31ee6abda6233446d4e9d5ed78dfb2) --- .../src/airflow/api_fastapi/common/db/dags.py | 92 ------------------- .../openapi/v1-rest-api-generated.yaml | 8 -- .../core_api/routes/public/dags.py | 8 +- .../airflow/ui/openapi-gen/queries/queries.ts | 5 - .../ui/openapi-gen/requests/services.gen.ts | 2 - .../ui/openapi-gen/requests/types.gen.ts | 1 - .../core_api/routes/public/test_dags.py | 8 +- 7 files changed, 5 insertions(+), 119 deletions(-) delete mode 100644 airflow-core/src/airflow/api_fastapi/common/db/dags.py diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dags.py b/airflow-core/src/airflow/api_fastapi/common/db/dags.py deleted file mode 100644 index d9d2b66e6f7a5..0000000000000 --- a/airflow-core/src/airflow/api_fastapi/common/db/dags.py +++ /dev/null @@ -1,92 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import annotations - -from typing import TYPE_CHECKING - -from sqlalchemy import func, select - -if TYPE_CHECKING: - from sqlalchemy.sql import Select - -from airflow.models.dag import DagModel -from airflow.models.dagrun import DagRun - - -def generate_dag_with_latest_run_query(dag_runs_cte: Select | None = None) -> Select: - latest_dag_run_per_dag_id_cte = ( - select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date")) - .where() - .group_by(DagRun.dag_id) - .cte() - ) - - dags_select_with_latest_dag_run = ( - select(DagModel) - .join( - latest_dag_run_per_dag_id_cte, - DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id, - isouter=True, - ) - .join( - DagRun, - DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date - and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id, - isouter=True, - ) - .order_by(DagModel.dag_id) - ) - - if dag_runs_cte is None: - return dags_select_with_latest_dag_run - - dag_run_filters_cte = ( - select(DagModel.dag_id) - .join( - dag_runs_cte, - DagModel.dag_id == dag_runs_cte.c.dag_id, - ) - .join( - DagRun, - DagRun.dag_id == dag_runs_cte.c.dag_id, - ) - .group_by(DagModel.dag_id) - .cte() - ) - - dags_with_latest_and_filtered_runs = ( - select(DagModel) - .join( - dag_run_filters_cte, - dag_run_filters_cte.c.dag_id == DagModel.dag_id, - ) - .join( - latest_dag_run_per_dag_id_cte, - DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id, - isouter=True, - ) - .join( - DagRun, - DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date - and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id, - isouter=True, - ) - .order_by(DagModel.dag_id) - ) - - return dags_with_latest_and_filtered_runs diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml index 53279035bc71d..ff6725b266b55 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml @@ -2844,14 +2844,6 @@ paths: - type: boolean - type: 'null' title: Paused - - name: last_dag_run_state - in: query - required: false - schema: - anyOf: - - $ref: '#/components/schemas/DagRunState' - - type: 'null' - title: Last Dag Run State requestBody: required: true content: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py index f55c5763701ef..c078f2baf43e0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py @@ -31,7 +31,6 @@ apply_filters_to_select, paginated_select, ) -from airflow.api_fastapi.common.db.dags import generate_dag_with_latest_run_query from airflow.api_fastapi.common.parameters import ( FilterOptionEnum, FilterParam, @@ -301,7 +300,6 @@ def patch_dags( dag_id_pattern: QueryDagIdPatternSearchWithNone, exclude_stale: QueryExcludeStaleFilter, paused: QueryPausedFilter, - last_dag_run_state: QueryLastDagRunStateFilter, editable_dags_filter: EditableDagsFilterDep, session: SessionDep, update_mask: list[str] | None = Query(None), @@ -318,18 +316,14 @@ def patch_dags( except ValidationError as e: raise RequestValidationError(errors=e.errors()) - # todo: this is not used? - update_mask = ["is_paused"] - dags_select, total_entries = paginated_select( - statement=generate_dag_with_latest_run_query(), + statement=select(DagModel), filters=[ exclude_stale, paused, dag_id_pattern, tags, owners, - last_dag_run_state, editable_dags_filter, ], order_by=None, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index e1b770b8e7dac..c604c40054df0 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -3876,7 +3876,6 @@ export const useDagRunServicePatchDagRun = < * @param data.dagIdPattern * @param data.excludeStale * @param data.paused - * @param data.lastDagRunState * @returns DAGCollectionResponse Successful Response * @throws ApiError */ @@ -3892,7 +3891,6 @@ export const useDagServicePatchDags = < { dagIdPattern?: string; excludeStale?: boolean; - lastDagRunState?: DagRunState; limit?: number; offset?: number; owners?: string[]; @@ -3913,7 +3911,6 @@ export const useDagServicePatchDags = < { dagIdPattern?: string; excludeStale?: boolean; - lastDagRunState?: DagRunState; limit?: number; offset?: number; owners?: string[]; @@ -3928,7 +3925,6 @@ export const useDagServicePatchDags = < mutationFn: ({ dagIdPattern, excludeStale, - lastDagRunState, limit, offset, owners, @@ -3941,7 +3937,6 @@ export const useDagServicePatchDags = < DagService.patchDags({ dagIdPattern, excludeStale, - lastDagRunState, limit, offset, owners, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 711c91093d8c0..48f41fb0d4e7d 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -1570,7 +1570,6 @@ export class DagService { * @param data.dagIdPattern * @param data.excludeStale * @param data.paused - * @param data.lastDagRunState * @returns DAGCollectionResponse Successful Response * @throws ApiError */ @@ -1588,7 +1587,6 @@ export class DagService { dag_id_pattern: data.dagIdPattern, exclude_stale: data.excludeStale, paused: data.paused, - last_dag_run_state: data.lastDagRunState, }, body: data.requestBody, mediaType: "application/json", diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index caca462b5831b..c45310973f87f 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2098,7 +2098,6 @@ export type GetDagsResponse = DAGCollectionResponse; export type PatchDagsData = { dagIdPattern?: string | null; excludeStale?: boolean; - lastDagRunState?: DagRunState | null; limit?: number; offset?: number; owners?: Array; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py index 8e7ee00b83553..f207de91d2e41 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py @@ -356,9 +356,9 @@ def test_patch_dags( assert response.status_code == expected_status_code if expected_status_code == 200: body = response.json() - assert [dag["dag_id"] for dag in body["dags"]] == expected_ids - paused_dag_ids = [dag["dag_id"] for dag in body["dags"] if dag["is_paused"]] - assert paused_dag_ids == expected_paused_ids + assert {dag["dag_id"] for dag in body["dags"]} == set(expected_ids) + paused_dag_ids = {dag["dag_id"] for dag in body["dags"] if dag["is_paused"]} + assert paused_dag_ids == set(expected_paused_ids) check_last_log(session, dag_id=DAG1_ID, event="patch_dag", logical_date=None) @mock.patch("airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids") @@ -371,7 +371,7 @@ def test_patch_dags_should_call_authorized_dag_ids(self, mock_get_authorized_dag assert response.status_code == 200 body = response.json() - assert [dag["dag_id"] for dag in body["dags"]] == [DAG1_ID, DAG2_ID] + assert {dag["dag_id"] for dag in body["dags"]} == {DAG1_ID, DAG2_ID} def test_patch_dags_should_response_401(self, unauthenticated_test_client): response = unauthenticated_test_client.patch("/dags", json={"is_paused": True})