From e50c43ec0c812bb5c306d9d5f32cf805f8fb8a86 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sun, 13 Oct 2024 08:39:25 +0530 Subject: [PATCH 01/13] add modify_dag_run --- .../endpoints/dag_run_endpoint.py | 1 + airflow/api_fastapi/openapi/v1-generated.yaml | 79 +++++++++++++++++++ airflow/api_fastapi/serializers/dag_run.py | 15 ++++ airflow/api_fastapi/views/public/dag_run.py | 18 ++++- airflow/ui/openapi-gen/queries/common.ts | 3 + airflow/ui/openapi-gen/queries/queries.ts | 53 ++++++++++++- .../ui/openapi-gen/requests/schemas.gen.ts | 19 +++++ .../ui/openapi-gen/requests/services.gen.ts | 34 ++++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 49 ++++++++++++ 9 files changed, 269 insertions(+), 2 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index a862b7c969503..223fa5aca5168 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -372,6 +372,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists") +@mark_fastapi_migration_done @security.requires_access_dag("PUT", DagAccessEntity.RUN) @provide_session @action_logging diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 7debfbb1008af..3ea10532a18da 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -679,6 +679,68 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + patch: + tags: + - DagRun + summary: Modify Dag Run + description: Modify a DAG Run. + operationId: modify_dag_run + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/DAGRunPatchBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGRunResponse' + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: ConnectionResponse: @@ -1147,6 +1209,23 @@ components: - file_token title: DAGResponse description: DAG serializer for responses. + DAGRunModifyFormStates: + type: string + enum: + - queued + - success + - failed + title: DAGRunModifyFormStates + description: Enum for DAG Run states when updating a DAG Run. + DAGRunPatchBody: + properties: + state: + $ref: '#/components/schemas/DAGRunModifyFormStates' + type: object + required: + - state + title: DAGRunPatchBody + description: DAG Run Serializer for PATCH requests. DAGRunResponse: properties: run_id: diff --git a/airflow/api_fastapi/serializers/dag_run.py b/airflow/api_fastapi/serializers/dag_run.py index 4622fac645c07..c35e2be08d4d7 100644 --- a/airflow/api_fastapi/serializers/dag_run.py +++ b/airflow/api_fastapi/serializers/dag_run.py @@ -18,6 +18,7 @@ from __future__ import annotations from datetime import datetime +from enum import Enum from pydantic import BaseModel, Field @@ -25,6 +26,20 @@ from airflow.utils.types import DagRunTriggeredByType, DagRunType +class DAGRunModifyFormStates(str, Enum): + """Enum for DAG Run states when updating a DAG Run.""" + + QUEUED = DagRunState.QUEUED + SUCCESS = DagRunState.SUCCESS + FAILED = DagRunState.FAILED + + +class DAGRunPatchBody(BaseModel): + """DAG Run Serializer for PATCH requests.""" + + state: DAGRunModifyFormStates + + class DAGRunResponse(BaseModel): """DAG Run serializer for responses.""" diff --git a/airflow/api_fastapi/views/public/dag_run.py b/airflow/api_fastapi/views/public/dag_run.py index d39fb6f2f331c..531e08df28adf 100644 --- a/airflow/api_fastapi/views/public/dag_run.py +++ b/airflow/api_fastapi/views/public/dag_run.py @@ -24,7 +24,7 @@ from airflow.api_fastapi.db.common import get_session from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc -from airflow.api_fastapi.serializers.dag_run import DAGRunResponse +from airflow.api_fastapi.serializers.dag_run import DAGRunPatchBody, DAGRunResponse from airflow.api_fastapi.views.router import AirflowRouter from airflow.models import DagRun @@ -42,3 +42,19 @@ async def get_dag_run( ) return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + +@dag_run_router.patch("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) +async def modify_dag_run( + dag_id: str, dag_run_id: str, state: DAGRunPatchBody, session: Annotated[Session, Depends(get_session)] +) -> DAGRunResponse: + """Modify a DAG Run.""" + dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) + if dag_run is None: + raise HTTPException( + 404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" + ) + + setattr(dag_run, "state", state.state) + + return DAGRunResponse.model_validate(dag_run, from_attributes=True) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index aaff196c0791d..62849843a2ba9 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -191,6 +191,9 @@ export type DagServicePatchDagsMutationResult = Awaited< export type DagServicePatchDagMutationResult = Awaited< ReturnType >; +export type DagRunServiceModifyDagRunMutationResult = Awaited< + ReturnType +>; export type ConnectionServiceDeleteConnectionMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 19bb17b342a84..50062182df1d1 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -14,7 +14,11 @@ import { DashboardService, VariableService, } from "../requests/services.gen"; -import { DAGPatchBody, DagRunState } from "../requests/types.gen"; +import { + DAGPatchBody, + DAGRunPatchBody, + DagRunState, +} from "../requests/types.gen"; import * as Common from "./common"; /** @@ -428,6 +432,53 @@ export const useDagServicePatchDag = < }) as unknown as Promise, ...options, }); +/** + * Modify Dag Run + * Modify a DAG Run. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.requestBody + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceModifyDagRun = < + TData = Common.DagRunServiceModifyDagRunMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + dagId: string; + dagRunId: string; + requestBody: DAGRunPatchBody; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + dagId: string; + dagRunId: string; + requestBody: DAGRunPatchBody; + }, + TContext + >({ + mutationFn: ({ dagId, dagRunId, requestBody }) => + DagRunService.modifyDagRun({ + dagId, + dagRunId, + requestBody, + }) as unknown as Promise, + ...options, + }); /** * Delete Connection * Delete a connection entry. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 18df5284651b7..67798b66521fa 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -784,6 +784,25 @@ export const $DAGResponse = { description: "DAG serializer for responses.", } as const; +export const $DAGRunModifyFormStates = { + type: "string", + enum: ["queued", "success", "failed"], + title: "DAGRunModifyFormStates", + description: "Enum for DAG Run states when updating a DAG Run.", +} as const; + +export const $DAGRunPatchBody = { + properties: { + state: { + $ref: "#/components/schemas/DAGRunModifyFormStates", + }, + }, + type: "object", + required: ["state"], + title: "DAGRunPatchBody", + description: "DAG Run Serializer for PATCH requests.", +} as const; + export const $DAGRunResponse = { properties: { run_id: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 9a126aef25fbc..1fc23b32f4398 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -27,6 +27,8 @@ import type { GetVariableResponse, GetDagRunData, GetDagRunResponse, + ModifyDagRunData, + ModifyDagRunResponse, } from "./types.gen"; export class AssetService { @@ -391,4 +393,36 @@ export class DagRunService { }, }); } + + /** + * Modify Dag Run + * Modify a DAG Run. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.requestBody + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ + public static modifyDagRun( + data: ModifyDagRunData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "PATCH", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + }, + body: data.requestBody, + mediaType: "application/json", + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 45bfa51aec9c4..c7eb95f54c596 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -120,6 +120,18 @@ export type DAGResponse = { readonly file_token: string; }; +/** + * Enum for DAG Run states when updating a DAG Run. + */ +export type DAGRunModifyFormStates = "queued" | "success" | "failed"; + +/** + * DAG Run Serializer for PATCH requests. + */ +export type DAGRunPatchBody = { + state: DAGRunModifyFormStates; +}; + /** * DAG Run serializer for responses. */ @@ -355,6 +367,14 @@ export type GetDagRunData = { export type GetDagRunResponse = DAGRunResponse; +export type ModifyDagRunData = { + dagId: string; + dagRunId: string; + requestBody: DAGRunPatchBody; +}; + +export type ModifyDagRunResponse = DAGRunResponse; + export type $OpenApiTs = { "/ui/next_run_assets/{dag_id}": { get: { @@ -657,5 +677,34 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; + patch: { + req: ModifyDagRunData; + res: { + /** + * Successful Response + */ + 200: DAGRunResponse; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; }; }; From 4a62ed5000f34957ce3a68b5021ccf69eedf2495 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sun, 13 Oct 2024 09:08:24 +0530 Subject: [PATCH 02/13] add tests --- .../api_fastapi/views/public/test_dag_run.py | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/api_fastapi/views/public/test_dag_run.py b/tests/api_fastapi/views/public/test_dag_run.py index 176ae07d3fadb..bb8b773c0afaa 100644 --- a/tests/api_fastapi/views/public/test_dag_run.py +++ b/tests/api_fastapi/views/public/test_dag_run.py @@ -136,3 +136,37 @@ def test_get_dag_run_not_found(test_client): assert response.status_code == 404 body = response.json() assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" + + +class TestModifyDagRun: + @pytest.mark.parametrize( + "dag_id, run_id, state, response_state", + [ + (DAG1_ID, DAG1_RUN1_ID, DagRunState.FAILED, DagRunState.FAILED), + (DAG1_ID, DAG1_RUN2_ID, DagRunState.SUCCESS, DagRunState.SUCCESS), + (DAG2_ID, DAG2_RUN1_ID, DagRunState.QUEUED, DagRunState.QUEUED), + ], + ) + def test_modify_dag_run(self, test_client, dag_id, run_id, state, response_state): + response = test_client.patch(f"/public/dags/{dag_id}/dagRuns/{run_id}", json={"state": state}) + assert response.status_code == 200 + body = response.json() + assert body["dag_id"] == dag_id + assert body["run_id"] == run_id + assert body["state"] == response_state + + def test_modify_dag_run_not_found(self, test_client): + response = test_client.patch( + f"/public/dags/{DAG1_ID}/dagRuns/invalid", json={"state": DagRunState.SUCCESS} + ) + assert response.status_code == 404 + body = response.json() + assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" + + def test_modify_dag_run_bad_request(self, test_client): + response = test_client.patch( + f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}", json={"state": "running"} + ) + assert response.status_code == 422 + body = response.json() + assert body["detail"][0]["msg"] == "Input should be 'queued', 'success' or 'failed'" From 444956d14a7035378189efb6b3a13a29eff102c0 Mon Sep 17 00:00:00 2001 From: Kalyan R Date: Wed, 16 Oct 2024 01:33:07 +0530 Subject: [PATCH 03/13] Update airflow/api_fastapi/views/public/dag_run.py --- airflow/api_fastapi/views/public/dag_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/views/public/dag_run.py b/airflow/api_fastapi/views/public/dag_run.py index 89fd51fd95eb7..335f1f0afe338 100644 --- a/airflow/api_fastapi/views/public/dag_run.py +++ b/airflow/api_fastapi/views/public/dag_run.py @@ -45,7 +45,7 @@ async def get_dag_run( @dag_run_router.delete( - "/{dag_run_id}", status_code=204, responses=create_openapi_http_exception_doc([401, 403, 404]) + "/{dag_run_id}", status_code=204, responses=create_openapi_http_exception_doc([400, 401, 403, 404]) ) async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]): """Delete a DAG Run entry.""" From 69b143923a2e10fd0bdf30669d045caeb84e52c4 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Wed, 16 Oct 2024 02:02:06 +0530 Subject: [PATCH 04/13] fix --- airflow/api_fastapi/openapi/v1-generated.yaml | 6 ++++++ airflow/ui/openapi-gen/requests/services.gen.ts | 1 + airflow/ui/openapi-gen/requests/types.gen.ts | 4 ++++ 3 files changed, 11 insertions(+) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index e645d006b2a4e..ca650a1c484c9 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -914,6 +914,12 @@ paths: responses: '204': description: Successful Response + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request '401': content: application/json: diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index cf84d1ecbcb92..ce1c4dbf47bff 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -540,6 +540,7 @@ export class DagRunService { dag_run_id: data.dagRunId, }, errors: { + 400: "Bad Request", 401: "Unauthorized", 403: "Forbidden", 404: "Not Found", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 5c1446978eaa1..01cef7f5c6193 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -887,6 +887,10 @@ export type $OpenApiTs = { * Successful Response */ 204: void; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; /** * Unauthorized */ From 521c6fc9a36f11c4900285809f2e1a895fbd862b Mon Sep 17 00:00:00 2001 From: Kalyan R Date: Wed, 16 Oct 2024 22:19:25 +0530 Subject: [PATCH 05/13] Update airflow/api_fastapi/routes/public/dag_run.py Co-authored-by: Pierre Jeambrun --- airflow/api_fastapi/routes/public/dag_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/routes/public/dag_run.py b/airflow/api_fastapi/routes/public/dag_run.py index f5489667a4aec..4c895fac6e48b 100644 --- a/airflow/api_fastapi/routes/public/dag_run.py +++ b/airflow/api_fastapi/routes/public/dag_run.py @@ -60,7 +60,7 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio @dag_run_router.patch("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) -async def modify_dag_run( +async def update_dag_run_state( dag_id: str, dag_run_id: str, state: DAGRunPatchBody, session: Annotated[Session, Depends(get_session)] ) -> DAGRunResponse: """Modify a DAG Run.""" From 7e00f2f7b72cd771f2c43266394b7f6652e415c0 Mon Sep 17 00:00:00 2001 From: Kalyan R Date: Wed, 16 Oct 2024 22:35:56 +0530 Subject: [PATCH 06/13] Update airflow/api_fastapi/serializers/dag_run.py Co-authored-by: Pierre Jeambrun --- airflow/api_fastapi/serializers/dag_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/serializers/dag_run.py b/airflow/api_fastapi/serializers/dag_run.py index c35e2be08d4d7..e66f73c4d01c4 100644 --- a/airflow/api_fastapi/serializers/dag_run.py +++ b/airflow/api_fastapi/serializers/dag_run.py @@ -26,7 +26,7 @@ from airflow.utils.types import DagRunTriggeredByType, DagRunType -class DAGRunModifyFormStates(str, Enum): +class DAGRunModifyStates(str, Enum): """Enum for DAG Run states when updating a DAG Run.""" QUEUED = DagRunState.QUEUED From 816dcb8554a90f99eb05245f3ff3069e32df74f0 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 19 Oct 2024 11:35:48 +0530 Subject: [PATCH 07/13] use dagbag --- .../core_api/routes/public/dag_run.py | 36 ++++++++++++++++--- .../core_api/routes/public/test_dag_run.py | 24 ++++++------- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 8fcc339219c5f..1ff64d567a470 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -17,16 +17,25 @@ from __future__ import annotations -from fastapi import Depends, HTTPException +from fastapi import Depends, HTTPException, Request from sqlalchemy import select from sqlalchemy.orm import Session from typing_extensions import Annotated +from airflow.api.common.mark_tasks import ( + set_dag_run_state_to_failed, + set_dag_run_state_to_queued, + set_dag_run_state_to_success, +) from airflow.api_fastapi.common.db.common import get_session from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunPatchBody, DAGRunResponse -from airflow.models import DagRun +from airflow.api_fastapi.core_api.serializers.dag_run import ( + DAGRunModifyStates, + DAGRunPatchBody, + DAGRunResponse, +) +from airflow.models import DAG, DagRun dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns") @@ -61,7 +70,11 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio @dag_run_router.patch("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) async def update_dag_run_state( - dag_id: str, dag_run_id: str, state: DAGRunPatchBody, session: Annotated[Session, Depends(get_session)] + dag_id: str, + dag_run_id: str, + state: DAGRunPatchBody, + session: Annotated[Session, Depends(get_session)], + request: Request, ) -> DAGRunResponse: """Modify a DAG Run.""" dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) @@ -69,7 +82,20 @@ async def update_dag_run_state( raise HTTPException( 404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" ) + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + if not dag: + raise HTTPException(404, f"Dag with id {dag_id} was not found") + print("State is ", state) + print("stats equals", state == DAGRunModifyStates.SUCCESS) + if state.state == DAGRunModifyStates.SUCCESS: + print("Setting state to success") + set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True) + elif state.state == DAGRunModifyStates.QUEUED: + print("setting state to queued") + set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True) + else: + set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True) - setattr(dag_run, "state", state.state) + dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 960eacc48c7a5..a04c7bbfdc96f 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -138,18 +138,6 @@ def test_get_dag_run_not_found(self, test_client): assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" -class TestDeleteDagRun: - def test_delete_dag_run(self, test_client): - response = test_client.delete(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}") - assert response.status_code == 204 - - def test_delete_dag_run_not_found(self, test_client): - response = test_client.delete(f"/public/dags/{DAG1_ID}/dagRuns/invalid") - assert response.status_code == 404 - body = response.json() - assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" - - class TestModifyDagRun: @pytest.mark.parametrize( "dag_id, run_id, state, response_state", @@ -182,3 +170,15 @@ def test_modify_dag_run_bad_request(self, test_client): assert response.status_code == 422 body = response.json() assert body["detail"][0]["msg"] == "Input should be 'queued', 'success' or 'failed'" + + +class TestDeleteDagRun: + def test_delete_dag_run(self, test_client): + response = test_client.delete(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}") + assert response.status_code == 204 + + def test_delete_dag_run_not_found(self, test_client): + response = test_client.delete(f"/public/dags/{DAG1_ID}/dagRuns/invalid") + assert response.status_code == 404 + body = response.json() + assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" From f1dad1c55e3c9440386eaf05dc52e8b79618feed Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 19 Oct 2024 11:42:49 +0530 Subject: [PATCH 08/13] replace patch with put --- .../core_api/openapi/v1-generated.yaml | 2 +- .../core_api/routes/public/dag_run.py | 4 +- airflow/ui/openapi-gen/queries/common.ts | 6 +- airflow/ui/openapi-gen/queries/queries.ts | 94 +++++++++---------- .../ui/openapi-gen/requests/services.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- 6 files changed, 54 insertions(+), 56 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 65caf9b7b2fe7..aea3683566acb 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -997,7 +997,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - patch: + put: tags: - DagRun summary: Update Dag Run State diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 1ff64d567a470..bda4f55881f5d 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -68,7 +68,7 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio session.delete(dag_run) -@dag_run_router.patch("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) +@dag_run_router.put("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) async def update_dag_run_state( dag_id: str, dag_run_id: str, @@ -88,10 +88,8 @@ async def update_dag_run_state( print("State is ", state) print("stats equals", state == DAGRunModifyStates.SUCCESS) if state.state == DAGRunModifyStates.SUCCESS: - print("Setting state to success") set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True) elif state.state == DAGRunModifyStates.QUEUED: - print("setting state to queued") set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True) else: set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index f139e5bc8da8c..137ee2ec01120 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -248,6 +248,9 @@ export const UseMonitorServiceGetHealthKeyFn = (queryKey?: Array) => [ export type VariableServicePostVariableMutationResult = Awaited< ReturnType >; +export type DagRunServiceUpdateDagRunStateMutationResult = Awaited< + ReturnType +>; export type DagServicePatchDagsMutationResult = Awaited< ReturnType >; @@ -257,9 +260,6 @@ export type DagServicePatchDagMutationResult = Awaited< export type VariableServicePatchVariableMutationResult = Awaited< ReturnType >; -export type DagRunServiceUpdateDagRunStateMutationResult = Awaited< - ReturnType ->; export type DagServiceDeleteDagMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index e7b0fb27d5746..39485359c3e9f 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -430,6 +430,53 @@ export const useVariableServicePostVariable = < }) as unknown as Promise, ...options, }); +/** + * Update Dag Run State + * Modify a DAG Run. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.requestBody + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceUpdateDagRunState = < + TData = Common.DagRunServiceUpdateDagRunStateMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + dagId: string; + dagRunId: string; + requestBody: DAGRunPatchBody; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + dagId: string; + dagRunId: string; + requestBody: DAGRunPatchBody; + }, + TContext + >({ + mutationFn: ({ dagId, dagRunId, requestBody }) => + DagRunService.updateDagRunState({ + dagId, + dagRunId, + requestBody, + }) as unknown as Promise, + ...options, + }); /** * Patch Dags * Patch multiple DAGs. @@ -610,53 +657,6 @@ export const useVariableServicePatchVariable = < }) as unknown as Promise, ...options, }); -/** - * Update Dag Run State - * Modify a DAG Run. - * @param data The data for the request. - * @param data.dagId - * @param data.dagRunId - * @param data.requestBody - * @returns DAGRunResponse Successful Response - * @throws ApiError - */ -export const useDagRunServiceUpdateDagRunState = < - TData = Common.DagRunServiceUpdateDagRunStateMutationResult, - TError = unknown, - TContext = unknown, ->( - options?: Omit< - UseMutationOptions< - TData, - TError, - { - dagId: string; - dagRunId: string; - requestBody: DAGRunPatchBody; - }, - TContext - >, - "mutationFn" - >, -) => - useMutation< - TData, - TError, - { - dagId: string; - dagRunId: string; - requestBody: DAGRunPatchBody; - }, - TContext - >({ - mutationFn: ({ dagId, dagRunId, requestBody }) => - DagRunService.updateDagRunState({ - dagId, - dagRunId, - requestBody, - }) as unknown as Promise, - ...options, - }); /** * Delete Dag * Delete the specific DAG. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index bbdca65a90ac3..fbf1663ff882e 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -594,7 +594,7 @@ export class DagRunService { data: UpdateDagRunStateData, ): CancelablePromise { return __request(OpenAPI, { - method: "PATCH", + method: "PUT", url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}", path: { dag_id: data.dagId, diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 4d522401e7cd3..31506f5022513 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -946,7 +946,7 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; - patch: { + put: { req: UpdateDagRunStateData; res: { /** From 28b6db89ac2a3a17cda2f346cafc88db39719ecb Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 19 Oct 2024 11:54:49 +0530 Subject: [PATCH 09/13] refactor --- airflow/api_fastapi/core_api/routes/public/dag_run.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index bda4f55881f5d..c7f4f9e38e595 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -82,11 +82,12 @@ async def update_dag_run_state( raise HTTPException( 404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" ) + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + if not dag: raise HTTPException(404, f"Dag with id {dag_id} was not found") - print("State is ", state) - print("stats equals", state == DAGRunModifyStates.SUCCESS) + if state.state == DAGRunModifyStates.SUCCESS: set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True) elif state.state == DAGRunModifyStates.QUEUED: From 7d66c482695a6a36587d2d72b91711dfa0f4d3ee Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 19 Oct 2024 13:21:28 +0530 Subject: [PATCH 10/13] use put in tests --- tests/api_fastapi/core_api/routes/public/test_dag_run.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index a04c7bbfdc96f..0457df3bf4614 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -148,7 +148,7 @@ class TestModifyDagRun: ], ) def test_modify_dag_run(self, test_client, dag_id, run_id, state, response_state): - response = test_client.patch(f"/public/dags/{dag_id}/dagRuns/{run_id}", json={"state": state}) + response = test_client.put(f"/public/dags/{dag_id}/dagRuns/{run_id}", json={"state": state}) assert response.status_code == 200 body = response.json() assert body["dag_id"] == dag_id @@ -156,7 +156,7 @@ def test_modify_dag_run(self, test_client, dag_id, run_id, state, response_state assert body["state"] == response_state def test_modify_dag_run_not_found(self, test_client): - response = test_client.patch( + response = test_client.put( f"/public/dags/{DAG1_ID}/dagRuns/invalid", json={"state": DagRunState.SUCCESS} ) assert response.status_code == 404 @@ -164,7 +164,7 @@ def test_modify_dag_run_not_found(self, test_client): assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" def test_modify_dag_run_bad_request(self, test_client): - response = test_client.patch( + response = test_client.put( f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}", json={"state": "running"} ) assert response.status_code == 422 From 63943b556b5928da6aeb6414165a92c033878a37 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 29 Oct 2024 17:08:09 +0530 Subject: [PATCH 11/13] modify to patch --- .../core_api/openapi/v1-generated.yaml | 2 +- .../core_api/routes/public/dag_run.py | 2 +- airflow/ui/openapi-gen/queries/common.ts | 6 +- airflow/ui/openapi-gen/queries/queries.ts | 94 +++++++++---------- .../ui/openapi-gen/requests/services.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- .../core_api/routes/public/test_dag_run.py | 6 +- 7 files changed, 57 insertions(+), 57 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index fb5b16f1f651a..fa11fc924cb80 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1156,7 +1156,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - put: + patch: tags: - DagRun summary: Update Dag Run State diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index c7f4f9e38e595..dfe6601fb88e9 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -68,7 +68,7 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio session.delete(dag_run) -@dag_run_router.put("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) +@dag_run_router.patch("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) async def update_dag_run_state( dag_id: str, dag_run_id: str, diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index c7c3fef3c392e..b563e210aa83e 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -415,9 +415,6 @@ export type VariableServicePostVariableMutationResult = Awaited< export type PoolServicePostPoolMutationResult = Awaited< ReturnType >; -export type DagRunServiceUpdateDagRunStateMutationResult = Awaited< - ReturnType ->; export type DagServicePatchDagsMutationResult = Awaited< ReturnType >; @@ -427,6 +424,9 @@ export type DagServicePatchDagMutationResult = Awaited< export type VariableServicePatchVariableMutationResult = Awaited< ReturnType >; +export type DagRunServiceUpdateDagRunStateMutationResult = Awaited< + ReturnType +>; export type PoolServicePatchPoolMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 3fda9b9f2097c..80f0204b34285 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -740,53 +740,6 @@ export const usePoolServicePostPool = < PoolService.postPool({ requestBody }) as unknown as Promise, ...options, }); -/** - * Update Dag Run State - * Modify a DAG Run. - * @param data The data for the request. - * @param data.dagId - * @param data.dagRunId - * @param data.requestBody - * @returns DAGRunResponse Successful Response - * @throws ApiError - */ -export const useDagRunServiceUpdateDagRunState = < - TData = Common.DagRunServiceUpdateDagRunStateMutationResult, - TError = unknown, - TContext = unknown, ->( - options?: Omit< - UseMutationOptions< - TData, - TError, - { - dagId: string; - dagRunId: string; - requestBody: DAGRunPatchBody; - }, - TContext - >, - "mutationFn" - >, -) => - useMutation< - TData, - TError, - { - dagId: string; - dagRunId: string; - requestBody: DAGRunPatchBody; - }, - TContext - >({ - mutationFn: ({ dagId, dagRunId, requestBody }) => - DagRunService.updateDagRunState({ - dagId, - dagRunId, - requestBody, - }) as unknown as Promise, - ...options, - }); /** * Patch Dags * Patch multiple DAGs. @@ -967,6 +920,53 @@ export const useVariableServicePatchVariable = < }) as unknown as Promise, ...options, }); +/** + * Update Dag Run State + * Modify a DAG Run. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.requestBody + * @returns DAGRunResponse Successful Response + * @throws ApiError + */ +export const useDagRunServiceUpdateDagRunState = < + TData = Common.DagRunServiceUpdateDagRunStateMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + dagId: string; + dagRunId: string; + requestBody: DAGRunPatchBody; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + dagId: string; + dagRunId: string; + requestBody: DAGRunPatchBody; + }, + TContext + >({ + mutationFn: ({ dagId, dagRunId, requestBody }) => + DagRunService.updateDagRunState({ + dagId, + dagRunId, + requestBody, + }) as unknown as Promise, + ...options, + }); /** * Patch Pool * Update a Pool. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 75c64c5ccc00c..2118e3b8853be 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -687,7 +687,7 @@ export class DagRunService { data: UpdateDagRunStateData, ): CancelablePromise { return __request(OpenAPI, { - method: "PUT", + method: "PATCH", url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}", path: { dag_id: data.dagId, diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index c4892ef05b709..1ca01c525ff2d 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1233,7 +1233,7 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; - put: { + patch: { req: UpdateDagRunStateData; res: { /** diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 0c0db952f3239..d647a04edc959 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -148,7 +148,7 @@ class TestModifyDagRun: ], ) def test_modify_dag_run(self, test_client, dag_id, run_id, state, response_state): - response = test_client.put(f"/public/dags/{dag_id}/dagRuns/{run_id}", json={"state": state}) + response = test_client.patch(f"/public/dags/{dag_id}/dagRuns/{run_id}", json={"state": state}) assert response.status_code == 200 body = response.json() assert body["dag_id"] == dag_id @@ -156,7 +156,7 @@ def test_modify_dag_run(self, test_client, dag_id, run_id, state, response_state assert body["state"] == response_state def test_modify_dag_run_not_found(self, test_client): - response = test_client.put( + response = test_client.patch( f"/public/dags/{DAG1_ID}/dagRuns/invalid", json={"state": DagRunState.SUCCESS} ) assert response.status_code == 404 @@ -164,7 +164,7 @@ def test_modify_dag_run_not_found(self, test_client): assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" def test_modify_dag_run_bad_request(self, test_client): - response = test_client.put( + response = test_client.patch( f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}", json={"state": "running"} ) assert response.status_code == 422 From 7802d8fc1219b924348a03f237ebd41d965c387b Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 29 Oct 2024 17:58:32 +0530 Subject: [PATCH 12/13] add update_mask --- .../core_api/openapi/v1-generated.yaml | 10 ++++++++ .../core_api/routes/public/dag_run.py | 24 +++++++++++++------ airflow/ui/openapi-gen/queries/queries.ts | 6 ++++- .../ui/openapi-gen/requests/services.gen.ts | 4 ++++ airflow/ui/openapi-gen/requests/types.gen.ts | 1 + .../core_api/routes/public/test_dag_run.py | 16 +++++++++++++ 6 files changed, 53 insertions(+), 8 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index fa11fc924cb80..d18da0321c81f 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1175,6 +1175,16 @@ paths: schema: type: string title: Dag Run Id + - name: update_mask + in: query + required: false + schema: + anyOf: + - type: array + items: + type: string + - type: 'null' + title: Update Mask requestBody: required: true content: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index dfe6601fb88e9..c8f7d2f336636 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -17,7 +17,7 @@ from __future__ import annotations -from fastapi import Depends, HTTPException, Request +from fastapi import Depends, HTTPException, Query, Request from sqlalchemy import select from sqlalchemy.orm import Session from typing_extensions import Annotated @@ -72,9 +72,10 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio async def update_dag_run_state( dag_id: str, dag_run_id: str, - state: DAGRunPatchBody, + patch_body: DAGRunPatchBody, session: Annotated[Session, Depends(get_session)], request: Request, + update_mask: list[str] | None = Query(None), ) -> DAGRunResponse: """Modify a DAG Run.""" dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) @@ -88,12 +89,21 @@ async def update_dag_run_state( if not dag: raise HTTPException(404, f"Dag with id {dag_id} was not found") - if state.state == DAGRunModifyStates.SUCCESS: - set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True) - elif state.state == DAGRunModifyStates.QUEUED: - set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True) + if update_mask: + if update_mask != ["state"]: + raise HTTPException(400, "Only `state` field can be updated through the REST API") else: - set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True) + update_mask = ["state"] + + for attr_name in update_mask: + if attr_name == "state": + state = getattr(patch_body, attr_name) + if state == DAGRunModifyStates.SUCCESS: + set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True) + elif state == DAGRunModifyStates.QUEUED: + set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True) + else: + set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True) dag_run = session.get(DagRun, dag_run.id) diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 80f0204b34285..13944621f0814 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -927,6 +927,7 @@ export const useVariableServicePatchVariable = < * @param data.dagId * @param data.dagRunId * @param data.requestBody + * @param data.updateMask * @returns DAGRunResponse Successful Response * @throws ApiError */ @@ -943,6 +944,7 @@ export const useDagRunServiceUpdateDagRunState = < dagId: string; dagRunId: string; requestBody: DAGRunPatchBody; + updateMask?: string[]; }, TContext >, @@ -956,14 +958,16 @@ export const useDagRunServiceUpdateDagRunState = < dagId: string; dagRunId: string; requestBody: DAGRunPatchBody; + updateMask?: string[]; }, TContext >({ - mutationFn: ({ dagId, dagRunId, requestBody }) => + mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) => DagRunService.updateDagRunState({ dagId, dagRunId, requestBody, + updateMask, }) as unknown as Promise, ...options, }); diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 2118e3b8853be..aebefbcf4005a 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -680,6 +680,7 @@ export class DagRunService { * @param data.dagId * @param data.dagRunId * @param data.requestBody + * @param data.updateMask * @returns DAGRunResponse Successful Response * @throws ApiError */ @@ -693,6 +694,9 @@ export class DagRunService { dag_id: data.dagId, dag_run_id: data.dagRunId, }, + query: { + update_mask: data.updateMask, + }, body: data.requestBody, mediaType: "application/json", errors: { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1ca01c525ff2d..84325d103d4c7 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -679,6 +679,7 @@ export type UpdateDagRunStateData = { dagId: string; dagRunId: string; requestBody: DAGRunPatchBody; + updateMask?: Array | null; }; export type UpdateDagRunStateResponse = DAGRunResponse; diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index d647a04edc959..17f9cab3358fb 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -155,6 +155,22 @@ def test_modify_dag_run(self, test_client, dag_id, run_id, state, response_state assert body["run_id"] == run_id assert body["state"] == response_state + @pytest.mark.parametrize( + "query_params,patch_body, expected_status_code", + [ + ({"update_mask": ["state"]}, {"state": DagRunState.SUCCESS}, 200), + ({}, {"state": DagRunState.SUCCESS}, 200), + ({"update_mask": ["random"]}, {"state": DagRunState.SUCCESS}, 400), + ], + ) + def test_modify_dag_run_with_update_mask( + self, test_client, query_params, patch_body, expected_status_code + ): + response = test_client.patch( + f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}", params=query_params, json=patch_body + ) + assert response.status_code == expected_status_code + def test_modify_dag_run_not_found(self, test_client): response = test_client.patch( f"/public/dags/{DAG1_ID}/dagRuns/invalid", json={"state": DagRunState.SUCCESS} From da21c064b261ed68cf9e6216958e71dafd27c502 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 29 Oct 2024 19:11:04 +0530 Subject: [PATCH 13/13] refactor update to patch --- .../core_api/openapi/v1-generated.yaml | 22 +++++++++---------- .../core_api/routes/public/dag_run.py | 8 +++---- .../core_api/serializers/dag_run.py | 4 ++-- airflow/ui/openapi-gen/queries/common.ts | 4 ++-- airflow/ui/openapi-gen/queries/queries.ts | 8 +++---- .../ui/openapi-gen/requests/schemas.gen.ts | 16 +++++++------- .../ui/openapi-gen/requests/services.gen.ts | 12 +++++----- airflow/ui/openapi-gen/requests/types.gen.ts | 18 +++++++-------- .../core_api/routes/public/test_dag_run.py | 10 ++++----- 9 files changed, 51 insertions(+), 51 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index d18da0321c81f..4cd275357dd08 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1159,9 +1159,9 @@ paths: patch: tags: - DagRun - summary: Update Dag Run State + summary: Patch Dag Run State description: Modify a DAG Run. - operationId: update_dag_run_state + operationId: patch_dag_run_state parameters: - name: dag_id in: path @@ -2107,23 +2107,23 @@ components: - file_token title: DAGResponse description: DAG serializer for responses. - DAGRunModifyStates: - type: string - enum: - - queued - - success - - failed - title: DAGRunModifyStates - description: Enum for DAG Run states when updating a DAG Run. DAGRunPatchBody: properties: state: - $ref: '#/components/schemas/DAGRunModifyStates' + $ref: '#/components/schemas/DAGRunPatchStates' type: object required: - state title: DAGRunPatchBody description: DAG Run Serializer for PATCH requests. + DAGRunPatchStates: + type: string + enum: + - queued + - success + - failed + title: DAGRunPatchStates + description: Enum for DAG Run states when updating a DAG Run. DAGRunResponse: properties: run_id: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index c8f7d2f336636..02780d6088e94 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -31,8 +31,8 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.serializers.dag_run import ( - DAGRunModifyStates, DAGRunPatchBody, + DAGRunPatchStates, DAGRunResponse, ) from airflow.models import DAG, DagRun @@ -69,7 +69,7 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio @dag_run_router.patch("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404])) -async def update_dag_run_state( +async def patch_dag_run_state( dag_id: str, dag_run_id: str, patch_body: DAGRunPatchBody, @@ -98,9 +98,9 @@ async def update_dag_run_state( for attr_name in update_mask: if attr_name == "state": state = getattr(patch_body, attr_name) - if state == DAGRunModifyStates.SUCCESS: + if state == DAGRunPatchStates.SUCCESS: set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True) - elif state == DAGRunModifyStates.QUEUED: + elif state == DAGRunPatchStates.QUEUED: set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True) else: set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True) diff --git a/airflow/api_fastapi/core_api/serializers/dag_run.py b/airflow/api_fastapi/core_api/serializers/dag_run.py index d8319059c42f4..15576905611c3 100644 --- a/airflow/api_fastapi/core_api/serializers/dag_run.py +++ b/airflow/api_fastapi/core_api/serializers/dag_run.py @@ -26,7 +26,7 @@ from airflow.utils.types import DagRunTriggeredByType, DagRunType -class DAGRunModifyStates(str, Enum): +class DAGRunPatchStates(str, Enum): """Enum for DAG Run states when updating a DAG Run.""" QUEUED = DagRunState.QUEUED @@ -37,7 +37,7 @@ class DAGRunModifyStates(str, Enum): class DAGRunPatchBody(BaseModel): """DAG Run Serializer for PATCH requests.""" - state: DAGRunModifyStates + state: DAGRunPatchStates class DAGRunResponse(BaseModel): diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index b563e210aa83e..4a58a21f74dc7 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -424,8 +424,8 @@ export type DagServicePatchDagMutationResult = Awaited< export type VariableServicePatchVariableMutationResult = Awaited< ReturnType >; -export type DagRunServiceUpdateDagRunStateMutationResult = Awaited< - ReturnType +export type DagRunServicePatchDagRunStateMutationResult = Awaited< + ReturnType >; export type PoolServicePatchPoolMutationResult = Awaited< ReturnType diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 13944621f0814..d01f25ee933fd 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -921,7 +921,7 @@ export const useVariableServicePatchVariable = < ...options, }); /** - * Update Dag Run State + * Patch Dag Run State * Modify a DAG Run. * @param data The data for the request. * @param data.dagId @@ -931,8 +931,8 @@ export const useVariableServicePatchVariable = < * @returns DAGRunResponse Successful Response * @throws ApiError */ -export const useDagRunServiceUpdateDagRunState = < - TData = Common.DagRunServiceUpdateDagRunStateMutationResult, +export const useDagRunServicePatchDagRunState = < + TData = Common.DagRunServicePatchDagRunStateMutationResult, TError = unknown, TContext = unknown, >( @@ -963,7 +963,7 @@ export const useDagRunServiceUpdateDagRunState = < TContext >({ mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) => - DagRunService.updateDagRunState({ + DagRunService.patchDagRunState({ dagId, dagRunId, requestBody, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 709679685a120..2909ed40bc8b2 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -887,17 +887,10 @@ export const $DAGResponse = { description: "DAG serializer for responses.", } as const; -export const $DAGRunModifyStates = { - type: "string", - enum: ["queued", "success", "failed"], - title: "DAGRunModifyStates", - description: "Enum for DAG Run states when updating a DAG Run.", -} as const; - export const $DAGRunPatchBody = { properties: { state: { - $ref: "#/components/schemas/DAGRunModifyStates", + $ref: "#/components/schemas/DAGRunPatchStates", }, }, type: "object", @@ -906,6 +899,13 @@ export const $DAGRunPatchBody = { description: "DAG Run Serializer for PATCH requests.", } as const; +export const $DAGRunPatchStates = { + type: "string", + enum: ["queued", "success", "failed"], + title: "DAGRunPatchStates", + description: "Enum for DAG Run states when updating a DAG Run.", +} as const; + export const $DAGRunResponse = { properties: { run_id: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index aebefbcf4005a..dc398bd6d2ffc 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -43,8 +43,8 @@ import type { GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, - UpdateDagRunStateData, - UpdateDagRunStateResponse, + PatchDagRunStateData, + PatchDagRunStateResponse, GetHealthResponse, DeletePoolData, DeletePoolResponse, @@ -674,7 +674,7 @@ export class DagRunService { } /** - * Update Dag Run State + * Patch Dag Run State * Modify a DAG Run. * @param data The data for the request. * @param data.dagId @@ -684,9 +684,9 @@ export class DagRunService { * @returns DAGRunResponse Successful Response * @throws ApiError */ - public static updateDagRunState( - data: UpdateDagRunStateData, - ): CancelablePromise { + public static patchDagRunState( + data: PatchDagRunStateData, + ): CancelablePromise { return __request(OpenAPI, { method: "PATCH", url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 84325d103d4c7..034e24555e8fc 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -154,18 +154,18 @@ export type DAGResponse = { readonly file_token: string; }; -/** - * Enum for DAG Run states when updating a DAG Run. - */ -export type DAGRunModifyStates = "queued" | "success" | "failed"; - /** * DAG Run Serializer for PATCH requests. */ export type DAGRunPatchBody = { - state: DAGRunModifyStates; + state: DAGRunPatchStates; }; +/** + * Enum for DAG Run states when updating a DAG Run. + */ +export type DAGRunPatchStates = "queued" | "success" | "failed"; + /** * DAG Run serializer for responses. */ @@ -675,14 +675,14 @@ export type DeleteDagRunData = { export type DeleteDagRunResponse = void; -export type UpdateDagRunStateData = { +export type PatchDagRunStateData = { dagId: string; dagRunId: string; requestBody: DAGRunPatchBody; updateMask?: Array | null; }; -export type UpdateDagRunStateResponse = DAGRunResponse; +export type PatchDagRunStateResponse = DAGRunResponse; export type GetHealthResponse = HealthInfoSchema; @@ -1235,7 +1235,7 @@ export type $OpenApiTs = { }; }; patch: { - req: UpdateDagRunStateData; + req: PatchDagRunStateData; res: { /** * Successful Response diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 17f9cab3358fb..dfd48af2fa530 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -138,7 +138,7 @@ def test_get_dag_run_not_found(self, test_client): assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" -class TestModifyDagRun: +class TestPatchDagRun: @pytest.mark.parametrize( "dag_id, run_id, state, response_state", [ @@ -147,7 +147,7 @@ class TestModifyDagRun: (DAG2_ID, DAG2_RUN1_ID, DagRunState.QUEUED, DagRunState.QUEUED), ], ) - def test_modify_dag_run(self, test_client, dag_id, run_id, state, response_state): + def test_patch_dag_run(self, test_client, dag_id, run_id, state, response_state): response = test_client.patch(f"/public/dags/{dag_id}/dagRuns/{run_id}", json={"state": state}) assert response.status_code == 200 body = response.json() @@ -163,7 +163,7 @@ def test_modify_dag_run(self, test_client, dag_id, run_id, state, response_state ({"update_mask": ["random"]}, {"state": DagRunState.SUCCESS}, 400), ], ) - def test_modify_dag_run_with_update_mask( + def test_patch_dag_run_with_update_mask( self, test_client, query_params, patch_body, expected_status_code ): response = test_client.patch( @@ -171,7 +171,7 @@ def test_modify_dag_run_with_update_mask( ) assert response.status_code == expected_status_code - def test_modify_dag_run_not_found(self, test_client): + def test_patch_dag_run_not_found(self, test_client): response = test_client.patch( f"/public/dags/{DAG1_ID}/dagRuns/invalid", json={"state": DagRunState.SUCCESS} ) @@ -179,7 +179,7 @@ def test_modify_dag_run_not_found(self, test_client): body = response.json() assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" - def test_modify_dag_run_bad_request(self, test_client): + def test_patch_dag_run_bad_request(self, test_client): response = test_client.patch( f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}", json={"state": "running"} )