From 7e8b8e61f1f7306183db51c5dfaf2e8d0f93ebaf Mon Sep 17 00:00:00 2001 From: kalyanr Date: Wed, 30 Oct 2024 16:40:30 +0530 Subject: [PATCH 01/14] include dag_run_note in update_mask --- .../api_fastapi/core_api/openapi/v1-generated.yaml | 6 ++++++ .../api_fastapi/core_api/routes/public/dag_run.py | 11 ++++++++--- airflow/api_fastapi/core_api/serializers/dag_run.py | 1 + airflow/ui/openapi-gen/requests/schemas.gen.ts | 13 ++++++++++++- airflow/ui/openapi-gen/requests/types.gen.ts | 1 + 5 files changed, 28 insertions(+), 4 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index ae5fc9e117738..465467c2c00b0 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2155,9 +2155,15 @@ components: properties: state: $ref: '#/components/schemas/DAGRunPatchStates' + note: + anyOf: + - type: string + - type: 'null' + title: Note type: object required: - state + - note title: DAGRunPatchBody description: DAG Run Serializer for PATCH requests. DAGRunPatchStates: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 02780d6088e94..ec74c77d5082c 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -78,6 +78,7 @@ async def patch_dag_run_state( update_mask: list[str] | None = Query(None), ) -> DAGRunResponse: """Modify a DAG Run.""" + ALLOWED_FIELD_MASK = ["state", "note"] dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) if dag_run is None: raise HTTPException( @@ -90,10 +91,11 @@ async def patch_dag_run_state( raise HTTPException(404, f"Dag with id {dag_id} was not found") if update_mask: - if update_mask != ["state"]: - raise HTTPException(400, "Only `state` field can be updated through the REST API") + for each in update_mask: + if each not in ALLOWED_FIELD_MASK: + raise HTTPException(400, f"Invalid field `{each}` in update mask") else: - update_mask = ["state"] + update_mask = ALLOWED_FIELD_MASK for attr_name in update_mask: if attr_name == "state": @@ -104,6 +106,9 @@ async def patch_dag_run_state( set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True) else: set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True) + elif attr_name == "note": + # Need to figure out how to get current user id + pass dag_run = session.get(DagRun, dag_run.id) diff --git a/airflow/api_fastapi/core_api/serializers/dag_run.py b/airflow/api_fastapi/core_api/serializers/dag_run.py index 15576905611c3..78ece30b5dbfd 100644 --- a/airflow/api_fastapi/core_api/serializers/dag_run.py +++ b/airflow/api_fastapi/core_api/serializers/dag_run.py @@ -38,6 +38,7 @@ class DAGRunPatchBody(BaseModel): """DAG Run Serializer for PATCH requests.""" state: DAGRunPatchStates + note: str | None class DAGRunResponse(BaseModel): diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 3982407c5f181..f460b5d7225e2 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -892,9 +892,20 @@ export const $DAGRunPatchBody = { state: { $ref: "#/components/schemas/DAGRunPatchStates", }, + note: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Note", + }, }, type: "object", - required: ["state"], + required: ["state", "note"], title: "DAGRunPatchBody", description: "DAG Run Serializer for PATCH requests.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index a3b1d8e6bef68..d9a0e75923546 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -159,6 +159,7 @@ export type DAGResponse = { */ export type DAGRunPatchBody = { state: DAGRunPatchStates; + note: string | null; }; /** From 7efff15d873f827bf6e3e9f86dbceea29e45e57d Mon Sep 17 00:00:00 2001 From: kalyanr Date: Thu, 31 Oct 2024 11:27:28 +0530 Subject: [PATCH 02/14] add dag run note --- .../core_api/openapi/v1-generated.yaml | 2 +- .../core_api/routes/public/dag_run.py | 16 +++++++++++----- .../api_fastapi/core_api/serializers/dag_run.py | 2 +- airflow/ui/openapi-gen/requests/schemas.gen.ts | 3 ++- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 465467c2c00b0..9f18cb2a5113a 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2158,12 +2158,12 @@ components: note: anyOf: - type: string + maxLength: 1000 - type: 'null' title: Note type: object required: - state - - note title: DAGRunPatchBody description: DAG Run Serializer for PATCH requests. DAGRunPatchStates: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index ec74c77d5082c..e7cf724a144d2 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -98,17 +98,23 @@ async def patch_dag_run_state( update_mask = ALLOWED_FIELD_MASK for attr_name in update_mask: + attr_value = getattr(patch_body, attr_name) if attr_name == "state": - state = getattr(patch_body, attr_name) - if state == DAGRunPatchStates.SUCCESS: + if attr_value == DAGRunPatchStates.SUCCESS: set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True) - elif state == DAGRunPatchStates.QUEUED: + elif attr_value == DAGRunPatchStates.QUEUED: set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True) else: set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True) elif attr_name == "note": - # Need to figure out how to get current user id - pass + # Once Authentication is implemented in this FastAPI app, + # user id will be added when updating dag run note + # Refer to https://github.com/apache/airflow/issues/43534 + if dag_run.dag_run_note is None: + dag_run.note = (attr_value, None) + else: + dag_run.dag_run_note.content = attr_value + dag_run.dag_run_note.user_id = None dag_run = session.get(DagRun, dag_run.id) diff --git a/airflow/api_fastapi/core_api/serializers/dag_run.py b/airflow/api_fastapi/core_api/serializers/dag_run.py index 78ece30b5dbfd..4a07ab79a3063 100644 --- a/airflow/api_fastapi/core_api/serializers/dag_run.py +++ b/airflow/api_fastapi/core_api/serializers/dag_run.py @@ -38,7 +38,7 @@ class DAGRunPatchBody(BaseModel): """DAG Run Serializer for PATCH requests.""" state: DAGRunPatchStates - note: str | None + note: str | None = Field(None, max_length=1000) class DAGRunResponse(BaseModel): diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index f460b5d7225e2..81b5243fd6243 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -896,6 +896,7 @@ export const $DAGRunPatchBody = { anyOf: [ { type: "string", + maxLength: 1000, }, { type: "null", @@ -905,7 +906,7 @@ export const $DAGRunPatchBody = { }, }, type: "object", - required: ["state", "note"], + required: ["state"], title: "DAGRunPatchBody", description: "DAG Run Serializer for PATCH requests.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index d9a0e75923546..ee8be1dbbba7d 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -159,7 +159,7 @@ export type DAGResponse = { */ export type DAGRunPatchBody = { state: DAGRunPatchStates; - note: string | null; + note?: string | null; }; /** From 31f052d4820618f0ae83b0c9063c6670a8927508 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Thu, 31 Oct 2024 14:55:13 +0530 Subject: [PATCH 03/14] state can be none --- airflow/api_fastapi/core_api/openapi/v1-generated.yaml | 4 +++- airflow/api_fastapi/core_api/routes/public/dag_run.py | 10 +++++++--- airflow/api_fastapi/core_api/serializers/dag_run.py | 2 +- airflow/ui/openapi-gen/requests/schemas.gen.ts | 9 ++++++++- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 9f18cb2a5113a..2d209b361778b 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2154,7 +2154,9 @@ components: DAGRunPatchBody: properties: state: - $ref: '#/components/schemas/DAGRunPatchStates' + anyOf: + - $ref: '#/components/schemas/DAGRunPatchStates' + - type: 'null' note: anyOf: - type: string diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index e7cf724a144d2..443332dbae880 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -19,7 +19,7 @@ from fastapi import Depends, HTTPException, Query, Request from sqlalchemy import select -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session, joinedload from typing_extensions import Annotated from airflow.api.common.mark_tasks import ( @@ -79,7 +79,9 @@ async def patch_dag_run_state( ) -> DAGRunResponse: """Modify a DAG Run.""" ALLOWED_FIELD_MASK = ["state", "note"] - dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) + dag_run = session.scalar( + select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id).options(joinedload(DagRun.dag_run_note)) + ) if dag_run is None: raise HTTPException( 404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found" @@ -100,6 +102,8 @@ async def patch_dag_run_state( for attr_name in update_mask: attr_value = getattr(patch_body, attr_name) if attr_name == "state": + if attr_value is None: + raise HTTPException(400, "state cannot be empty when it is included in the update mask") if attr_value == DAGRunPatchStates.SUCCESS: set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True) elif attr_value == DAGRunPatchStates.QUEUED: @@ -115,7 +119,7 @@ async def patch_dag_run_state( else: dag_run.dag_run_note.content = attr_value dag_run.dag_run_note.user_id = None - + session.commit() dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) diff --git a/airflow/api_fastapi/core_api/serializers/dag_run.py b/airflow/api_fastapi/core_api/serializers/dag_run.py index 4a07ab79a3063..9d992d363f32b 100644 --- a/airflow/api_fastapi/core_api/serializers/dag_run.py +++ b/airflow/api_fastapi/core_api/serializers/dag_run.py @@ -37,7 +37,7 @@ class DAGRunPatchStates(str, Enum): class DAGRunPatchBody(BaseModel): """DAG Run Serializer for PATCH requests.""" - state: DAGRunPatchStates + state: DAGRunPatchStates | None note: str | None = Field(None, max_length=1000) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 81b5243fd6243..f7c13ae89ac74 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -890,7 +890,14 @@ export const $DAGResponse = { export const $DAGRunPatchBody = { properties: { state: { - $ref: "#/components/schemas/DAGRunPatchStates", + anyOf: [ + { + $ref: "#/components/schemas/DAGRunPatchStates", + }, + { + type: "null", + }, + ], }, note: { anyOf: [ diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index ee8be1dbbba7d..dc3e0a3e37959 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -158,7 +158,7 @@ export type DAGResponse = { * DAG Run Serializer for PATCH requests. */ export type DAGRunPatchBody = { - state: DAGRunPatchStates; + state: DAGRunPatchStates | null; note?: string | null; }; From 21ac907c1e037b3ece449e8faf51fc6517230710 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Thu, 31 Oct 2024 17:52:23 +0530 Subject: [PATCH 04/14] add test --- .../core_api/openapi/v1-generated.yaml | 2 -- .../core_api/routes/public/dag_run.py | 4 +-- .../core_api/serializers/dag_run.py | 2 +- .../ui/openapi-gen/requests/schemas.gen.ts | 1 - airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- .../core_api/routes/public/test_dag_run.py | 32 +++++++++++++------ 6 files changed, 27 insertions(+), 16 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 2d209b361778b..b8b6d849ec78d 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2164,8 +2164,6 @@ components: - type: 'null' title: Note type: object - required: - - state title: DAGRunPatchBody description: DAG Run Serializer for PATCH requests. DAGRunPatchStates: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 443332dbae880..df0ffd25222ca 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -103,7 +103,7 @@ async def patch_dag_run_state( attr_value = getattr(patch_body, attr_name) if attr_name == "state": if attr_value is None: - raise HTTPException(400, "state cannot be empty when it is included in the update mask") + continue if attr_value == DAGRunPatchStates.SUCCESS: set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True) elif attr_value == DAGRunPatchStates.QUEUED: @@ -118,8 +118,8 @@ async def patch_dag_run_state( dag_run.note = (attr_value, None) else: dag_run.dag_run_note.content = attr_value - dag_run.dag_run_note.user_id = None session.commit() dag_run = session.get(DagRun, dag_run.id) + print(f"return val: {dag_run.dag_run_note.content}") return DAGRunResponse.model_validate(dag_run, from_attributes=True) diff --git a/airflow/api_fastapi/core_api/serializers/dag_run.py b/airflow/api_fastapi/core_api/serializers/dag_run.py index 9d992d363f32b..759c4399fbd70 100644 --- a/airflow/api_fastapi/core_api/serializers/dag_run.py +++ b/airflow/api_fastapi/core_api/serializers/dag_run.py @@ -37,7 +37,7 @@ class DAGRunPatchStates(str, Enum): class DAGRunPatchBody(BaseModel): """DAG Run Serializer for PATCH requests.""" - state: DAGRunPatchStates | None + state: DAGRunPatchStates | None = None note: str | None = Field(None, max_length=1000) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index f7c13ae89ac74..735750337b7be 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -913,7 +913,6 @@ export const $DAGRunPatchBody = { }, }, type: "object", - required: ["state"], title: "DAGRunPatchBody", description: "DAG Run Serializer for PATCH requests.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index dc3e0a3e37959..3a2ffcbc90dae 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -158,7 +158,7 @@ export type DAGResponse = { * DAG Run Serializer for PATCH requests. */ export type DAGRunPatchBody = { - state: DAGRunPatchStates | null; + state?: DAGRunPatchStates | null; note?: string | null; }; diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index dfd48af2fa530..ceb0d6da36075 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -66,13 +66,13 @@ def setup(dag_maker, session=None): start_date=START_DATE, ): EmptyOperator(task_id="task_1") - dag1 = dag_maker.create_dagrun( + dag_run1 = dag_maker.create_dagrun( run_id=DAG1_RUN1_ID, state=DAG1_RUN1_STATE, run_type=DAG1_RUN1_RUN_TYPE, triggered_by=DAG1_RUN1_TRIGGERED_BY, ) - dag1.note = (DAG1_NOTE, 1) + dag_run1.note = (DAG1_NOTE, 1) dag_maker.create_dagrun( run_id=DAG1_RUN2_ID, @@ -140,20 +140,34 @@ def test_get_dag_run_not_found(self, test_client): class TestPatchDagRun: @pytest.mark.parametrize( - "dag_id, run_id, state, response_state", + "dag_id, run_id, patch_body, response_body", [ - (DAG1_ID, DAG1_RUN1_ID, DagRunState.FAILED, DagRunState.FAILED), - (DAG1_ID, DAG1_RUN2_ID, DagRunState.SUCCESS, DagRunState.SUCCESS), - (DAG2_ID, DAG2_RUN1_ID, DagRunState.QUEUED, DagRunState.QUEUED), + (DAG1_ID, DAG1_RUN1_ID, {"state": DagRunState.FAILED}, {"state": DagRunState.FAILED}), + (DAG1_ID, DAG1_RUN2_ID, {"state": DagRunState.SUCCESS}, {"state": DagRunState.SUCCESS}), + (DAG2_ID, DAG2_RUN1_ID, {"state": DagRunState.QUEUED}, {"state": DagRunState.QUEUED}), + ( + DAG1_ID, + DAG1_RUN1_ID, + {"note": "updated note"}, + {"state": DagRunState.SUCCESS, "note": "updated note"}, + ), + ( + DAG1_ID, + DAG1_RUN2_ID, + {"note": "new note", "state": DagRunState.FAILED}, + {"state": DagRunState.FAILED, "note": "new note"}, + ), + (DAG1_ID, DAG1_RUN2_ID, {"note": None}, {"state": DagRunState.FAILED, "note": None}), ], ) - def test_patch_dag_run(self, test_client, dag_id, run_id, state, response_state): - response = test_client.patch(f"/public/dags/{dag_id}/dagRuns/{run_id}", json={"state": state}) + def test_patch_dag_run(self, test_client, dag_id, run_id, patch_body, response_body): + response = test_client.patch(f"/public/dags/{dag_id}/dagRuns/{run_id}", json=patch_body) assert response.status_code == 200 body = response.json() assert body["dag_id"] == dag_id assert body["run_id"] == run_id - assert body["state"] == response_state + assert body.get("state") == response_body.get("state") + assert body.get("note") == response_body.get("note") @pytest.mark.parametrize( "query_params,patch_body, expected_status_code", From e75afb79ae7f06f594064e55211f3ff1a97ef92c Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Tue, 5 Nov 2024 17:59:52 +0100 Subject: [PATCH 05/14] Fix tests --- .../core_api/routes/public/dag_run.py | 27 ++++++++------- .../core_api/routes/public/test_dag_run.py | 34 +++++++++++++++---- 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index f44bdf74d39b1..41ae0b1f1c707 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -130,18 +130,22 @@ async def patch_dag_run_state( else: update_mask = ALLOWED_FIELD_MASK + if "state" in update_mask: + attr_value = getattr(patch_body, "state") + if attr_value == DAGRunPatchStates.SUCCESS: + set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + elif attr_value == DAGRunPatchStates.QUEUED: + set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + elif attr_value == DAGRunPatchStates.FAILED: + set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + + dag_run = session.get(DagRun, dag_run.id) + for attr_name in update_mask: attr_value = getattr(patch_body, attr_name) - if attr_name == "state": - if attr_value is None: - continue - if attr_value == DAGRunPatchStates.SUCCESS: - set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True) - elif attr_value == DAGRunPatchStates.QUEUED: - set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True) - else: - set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True) - elif attr_name == "note": + if attr_value is None: + continue + if attr_name == "note": # Once Authentication is implemented in this FastAPI app, # user id will be added when updating dag run note # Refer to https://github.com/apache/airflow/issues/43534 @@ -149,8 +153,5 @@ async def patch_dag_run_state( dag_run.note = (attr_value, None) else: dag_run.dag_run_note.content = attr_value - session.commit() - dag_run = session.get(DagRun, dag_run.id) - print(f"return val: {dag_run.dag_run_note.content}") return DAGRunResponse.model_validate(dag_run, from_attributes=True) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index ceb0d6da36075..fff13723c0fd4 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -50,7 +50,7 @@ DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API START_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc) EXECUTION_DATE = datetime(2024, 6, 16, 0, 0, tzinfo=timezone.utc) -DAG1_NOTE = "test_note" +DAG1_RUN1_NOTE = "test_note" @pytest.fixture(autouse=True) @@ -72,7 +72,7 @@ def setup(dag_maker, session=None): run_type=DAG1_RUN1_RUN_TYPE, triggered_by=DAG1_RUN1_TRIGGERED_BY, ) - dag_run1.note = (DAG1_NOTE, 1) + dag_run1.note = (DAG1_RUN1_NOTE, 1) dag_maker.create_dagrun( run_id=DAG1_RUN2_ID, @@ -114,7 +114,14 @@ class TestGetDagRun: @pytest.mark.parametrize( "dag_id, run_id, state, run_type, triggered_by, dag_run_note", [ - (DAG1_ID, DAG1_RUN1_ID, DAG1_RUN1_STATE, DAG1_RUN1_RUN_TYPE, DAG1_RUN1_TRIGGERED_BY, DAG1_NOTE), + ( + DAG1_ID, + DAG1_RUN1_ID, + DAG1_RUN1_STATE, + DAG1_RUN1_RUN_TYPE, + DAG1_RUN1_TRIGGERED_BY, + DAG1_RUN1_NOTE, + ), (DAG1_ID, DAG1_RUN2_ID, DAG1_RUN2_STATE, DAG1_RUN2_RUN_TYPE, DAG1_RUN2_TRIGGERED_BY, None), (DAG2_ID, DAG2_RUN1_ID, DAG2_RUN1_STATE, DAG2_RUN1_RUN_TYPE, DAG2_RUN1_TRIGGERED_BY, None), (DAG2_ID, DAG2_RUN2_ID, DAG2_RUN2_STATE, DAG2_RUN2_RUN_TYPE, DAG2_RUN2_TRIGGERED_BY, None), @@ -142,9 +149,24 @@ class TestPatchDagRun: @pytest.mark.parametrize( "dag_id, run_id, patch_body, response_body", [ - (DAG1_ID, DAG1_RUN1_ID, {"state": DagRunState.FAILED}, {"state": DagRunState.FAILED}), - (DAG1_ID, DAG1_RUN2_ID, {"state": DagRunState.SUCCESS}, {"state": DagRunState.SUCCESS}), - (DAG2_ID, DAG2_RUN1_ID, {"state": DagRunState.QUEUED}, {"state": DagRunState.QUEUED}), + ( + DAG1_ID, + DAG1_RUN1_ID, + {"state": DagRunState.FAILED}, + {"state": DagRunState.FAILED, "note": DAG1_RUN1_NOTE}, + ), + ( + DAG1_ID, + DAG1_RUN2_ID, + {"state": DagRunState.SUCCESS}, + {"state": DagRunState.SUCCESS, "note": None}, + ), + ( + DAG2_ID, + DAG2_RUN1_ID, + {"state": DagRunState.QUEUED}, + {"state": DagRunState.QUEUED, "note": None}, + ), ( DAG1_ID, DAG1_RUN1_ID, From 903888465638f5ec44ff61547e6e72498e27d2e0 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Wed, 6 Nov 2024 18:13:28 +0530 Subject: [PATCH 06/14] handle edge cases --- .../core_api/openapi/v1-generated.yaml | 4 +- .../core_api/routes/public/dag_run.py | 52 ++++++++++++------- airflow/ui/openapi-gen/queries/common.ts | 4 +- airflow/ui/openapi-gen/queries/queries.ts | 8 +-- .../ui/openapi-gen/requests/services.gen.ts | 12 ++--- airflow/ui/openapi-gen/requests/types.gen.ts | 6 +-- 6 files changed, 49 insertions(+), 37 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index b6d03729c1389..78a1f5a0f201f 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1209,9 +1209,9 @@ paths: patch: tags: - DagRun - summary: Patch Dag Run State + summary: Patch Dag Run description: Modify a DAG Run. - operationId: patch_dag_run_state + operationId: patch_dag_run parameters: - name: dag_id in: path diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 41ae0b1f1c707..e5fc4d331c73b 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -99,7 +99,7 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio ] ), ) -async def patch_dag_run_state( +async def patch_dag_run( dag_id: str, dag_run_id: str, patch_body: DAGRunPatchBody, @@ -124,34 +124,46 @@ async def patch_dag_run_state( raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") if update_mask: - for each in update_mask: - if each not in ALLOWED_FIELD_MASK: - raise HTTPException(400, f"Invalid field `{each}` in update mask") - else: - update_mask = ALLOWED_FIELD_MASK - - if "state" in update_mask: - attr_value = getattr(patch_body, "state") - if attr_value == DAGRunPatchStates.SUCCESS: - set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True, session=session) - elif attr_value == DAGRunPatchStates.QUEUED: - set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True, session=session) - elif attr_value == DAGRunPatchStates.FAILED: - set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + update_mask_set = set(update_mask) + validation_errors = [] - dag_run = session.get(DagRun, dag_run.id) + invalid_fields = update_mask_set - set(ALLOWED_FIELD_MASK) + if invalid_fields: + validation_errors.append(f"Invalid fields in update mask: {', '.join(invalid_fields)}") + + missing_fields = update_mask_set - patch_body.model_fields_set + if missing_fields: + validation_errors.append(f"Fields not present in request body: {', '.join(missing_fields)}") + + if validation_errors: + raise HTTPException(400, "; ".join(validation_errors)) - for attr_name in update_mask: + fields_to_update = { + field + for field in ALLOWED_FIELD_MASK + if field in patch_body.model_fields_set and (update_mask is None or field in update_mask) + } + + for attr_name in fields_to_update: attr_value = getattr(patch_body, attr_name) - if attr_value is None: - continue - if attr_name == "note": + if attr_name == "state": + attr_value = getattr(patch_body, "state") + if attr_value == DAGRunPatchStates.SUCCESS: + set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + elif attr_value == DAGRunPatchStates.QUEUED: + set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + elif attr_value == DAGRunPatchStates.FAILED: + set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True, session=session) + elif attr_name == "note": # Once Authentication is implemented in this FastAPI app, # user id will be added when updating dag run note # Refer to https://github.com/apache/airflow/issues/43534 + dag_run = session.get(DagRun, dag_run.id) if dag_run.dag_run_note is None: dag_run.note = (attr_value, None) else: dag_run.dag_run_note.content = attr_value + dag_run = session.get(DagRun, dag_run.id) + return DAGRunResponse.model_validate(dag_run, from_attributes=True) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 1248a77ce188d..121cc23b2bf28 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -658,8 +658,8 @@ export type DagServicePatchDagsMutationResult = Awaited< export type DagServicePatchDagMutationResult = Awaited< ReturnType >; -export type DagRunServicePatchDagRunStateMutationResult = Awaited< - ReturnType +export type DagRunServicePatchDagRunMutationResult = Awaited< + ReturnType >; export type PoolServicePatchPoolMutationResult = Awaited< ReturnType diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 70796be401719..12e749143423f 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1387,7 +1387,7 @@ export const useDagServicePatchDag = < ...options, }); /** - * Patch Dag Run State + * Patch Dag Run * Modify a DAG Run. * @param data The data for the request. * @param data.dagId @@ -1397,8 +1397,8 @@ export const useDagServicePatchDag = < * @returns DAGRunResponse Successful Response * @throws ApiError */ -export const useDagRunServicePatchDagRunState = < - TData = Common.DagRunServicePatchDagRunStateMutationResult, +export const useDagRunServicePatchDagRun = < + TData = Common.DagRunServicePatchDagRunMutationResult, TError = unknown, TContext = unknown, >( @@ -1429,7 +1429,7 @@ export const useDagRunServicePatchDagRunState = < TContext >({ mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) => - DagRunService.patchDagRunState({ + DagRunService.patchDagRun({ dagId, dagRunId, requestBody, diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index fa5c7739c909b..94f5eb61d54b5 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -45,8 +45,8 @@ import type { GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, - PatchDagRunStateData, - PatchDagRunStateResponse, + PatchDagRunData, + PatchDagRunResponse, GetDagSourceData, GetDagSourceResponse, GetEventLogData, @@ -717,7 +717,7 @@ export class DagRunService { } /** - * Patch Dag Run State + * Patch Dag Run * Modify a DAG Run. * @param data The data for the request. * @param data.dagId @@ -727,9 +727,9 @@ export class DagRunService { * @returns DAGRunResponse Successful Response * @throws ApiError */ - public static patchDagRunState( - data: PatchDagRunStateData, - ): CancelablePromise { + public static patchDagRun( + data: PatchDagRunData, + ): CancelablePromise { return __request(OpenAPI, { method: "PATCH", url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 2bba54873aeb2..b99e658b4aa0f 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -838,14 +838,14 @@ export type DeleteDagRunData = { export type DeleteDagRunResponse = void; -export type PatchDagRunStateData = { +export type PatchDagRunData = { dagId: string; dagRunId: string; requestBody: DAGRunPatchBody; updateMask?: Array | null; }; -export type PatchDagRunStateResponse = DAGRunResponse; +export type PatchDagRunResponse = DAGRunResponse; export type GetDagSourceData = { accept?: string; @@ -1538,7 +1538,7 @@ export type $OpenApiTs = { }; }; patch: { - req: PatchDagRunStateData; + req: PatchDagRunData; res: { /** * Successful Response From fda670737eb9ec2138054c97221366c84128f11b Mon Sep 17 00:00:00 2001 From: kalyanr Date: Wed, 6 Nov 2024 19:12:42 +0530 Subject: [PATCH 07/14] add tests --- .../core_api/routes/public/dag_run.py | 2 +- .../core_api/routes/public/test_dag_run.py | 30 +++++++++++++++---- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index e5fc4d331c73b..8be6b1c133a85 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -131,7 +131,7 @@ async def patch_dag_run( if invalid_fields: validation_errors.append(f"Invalid fields in update mask: {', '.join(invalid_fields)}") - missing_fields = update_mask_set - patch_body.model_fields_set + missing_fields = update_mask_set - invalid_fields - patch_body.model_fields_set if missing_fields: validation_errors.append(f"Fields not present in request body: {', '.join(missing_fields)}") diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index fff13723c0fd4..e49a3676a9536 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -192,20 +192,40 @@ def test_patch_dag_run(self, test_client, dag_id, run_id, patch_body, response_b assert body.get("note") == response_body.get("note") @pytest.mark.parametrize( - "query_params,patch_body, expected_status_code", + "query_params,patch_body,response_body,expected_status_code", [ - ({"update_mask": ["state"]}, {"state": DagRunState.SUCCESS}, 200), - ({}, {"state": DagRunState.SUCCESS}, 200), - ({"update_mask": ["random"]}, {"state": DagRunState.SUCCESS}, 400), + ({"update_mask": ["state"]}, {"state": DagRunState.SUCCESS}, {"state": "success"}, 200), + ( + {"update_mask": ["note"]}, + {"state": DagRunState.FAILED, "note": "new_note1"}, + {"note": "new_note1", "state": "success"}, + 200, + ), + ( + {}, + {"state": DagRunState.FAILED, "note": "new_note2"}, + {"note": "new_note2", "state": "failed"}, + 200, + ), + ({"update_mask": ["state"]}, {}, {"detail": "Fields not present in request body: state"}, 400), + ( + {"update_mask": ["random"]}, + {"state": DagRunState.SUCCESS}, + {"detail": "Invalid fields in update mask: random"}, + 400, + ), ], ) def test_patch_dag_run_with_update_mask( - self, test_client, query_params, patch_body, expected_status_code + self, test_client, query_params, patch_body, response_body, expected_status_code ): response = test_client.patch( f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}", params=query_params, json=patch_body ) + response_json = response.json() assert response.status_code == expected_status_code + for key, value in response_body.items(): + assert response_json.get(key) == value def test_patch_dag_run_not_found(self, test_client): response = test_client.patch( From aeb64aae1a8a1be21d3998e173e7722a69e12997 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Thu, 7 Nov 2024 18:03:30 +0530 Subject: [PATCH 08/14] remove joinedload --- airflow/api_fastapi/core_api/routes/public/dag_run.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 8be6b1c133a85..af2bb12a9ac41 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -19,7 +19,7 @@ from fastapi import Depends, HTTPException, Query, Request, status from sqlalchemy import select -from sqlalchemy.orm import Session, joinedload +from sqlalchemy.orm import Session from typing_extensions import Annotated from airflow.api.common.mark_tasks import ( @@ -109,9 +109,7 @@ async def patch_dag_run( ) -> DAGRunResponse: """Modify a DAG Run.""" ALLOWED_FIELD_MASK = ["state", "note"] - dag_run = session.scalar( - select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id).options(joinedload(DagRun.dag_run_note)) - ) + dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) if dag_run is None: raise HTTPException( status.HTTP_404_NOT_FOUND, From 8276ea7762e990ee3ed910f58054a8b07a29c06c Mon Sep 17 00:00:00 2001 From: kalyanr Date: Thu, 7 Nov 2024 18:50:11 +0530 Subject: [PATCH 09/14] fix update_mask checks --- .../core_api/routes/public/dag_run.py | 26 +++---------------- .../core_api/routes/public/test_dag_run.py | 8 +++--- 2 files changed, 8 insertions(+), 26 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index af2bb12a9ac41..08654b36eabd7 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -108,7 +108,6 @@ async def patch_dag_run( update_mask: list[str] | None = Query(None), ) -> DAGRunResponse: """Modify a DAG Run.""" - ALLOWED_FIELD_MASK = ["state", "note"] dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) if dag_run is None: raise HTTPException( @@ -122,28 +121,11 @@ async def patch_dag_run( raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") if update_mask: - update_mask_set = set(update_mask) - validation_errors = [] + data = patch_body.model_dump(include=set(update_mask), exclude_unset=True) + else: + data = patch_body.model_dump(exclude_unset=True) - invalid_fields = update_mask_set - set(ALLOWED_FIELD_MASK) - if invalid_fields: - validation_errors.append(f"Invalid fields in update mask: {', '.join(invalid_fields)}") - - missing_fields = update_mask_set - invalid_fields - patch_body.model_fields_set - if missing_fields: - validation_errors.append(f"Fields not present in request body: {', '.join(missing_fields)}") - - if validation_errors: - raise HTTPException(400, "; ".join(validation_errors)) - - fields_to_update = { - field - for field in ALLOWED_FIELD_MASK - if field in patch_body.model_fields_set and (update_mask is None or field in update_mask) - } - - for attr_name in fields_to_update: - attr_value = getattr(patch_body, attr_name) + for attr_name, attr_value in data.items(): if attr_name == "state": attr_value = getattr(patch_body, "state") if attr_value == DAGRunPatchStates.SUCCESS: diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index e49a3676a9536..19c5919b074ec 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -207,12 +207,12 @@ def test_patch_dag_run(self, test_client, dag_id, run_id, patch_body, response_b {"note": "new_note2", "state": "failed"}, 200, ), - ({"update_mask": ["state"]}, {}, {"detail": "Fields not present in request body: state"}, 400), + ({"update_mask": ["note"]}, {}, {"state": "success", "note": "test_note"}, 200), ( {"update_mask": ["random"]}, - {"state": DagRunState.SUCCESS}, - {"detail": "Invalid fields in update mask: random"}, - 400, + {"state": DagRunState.FAILED}, + {"state": "success", "note": "test_note"}, + 200, ), ], ) From cb02a623812f3d6b27cc5e3d6ab0692f383fe28a Mon Sep 17 00:00:00 2001 From: kalyanr Date: Thu, 7 Nov 2024 19:07:27 +0530 Subject: [PATCH 10/14] fix tests --- airflow/api_fastapi/core_api/routes/public/dag_run.py | 2 +- tests/api_fastapi/core_api/routes/public/test_dag_run.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 08654b36eabd7..bb335c82b76d9 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -121,7 +121,7 @@ async def patch_dag_run( raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") if update_mask: - data = patch_body.model_dump(include=set(update_mask), exclude_unset=True) + data = patch_body.model_dump(include=set(update_mask)) else: data = patch_body.model_dump(exclude_unset=True) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 19c5919b074ec..4cc260d7bc586 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -207,7 +207,7 @@ def test_patch_dag_run(self, test_client, dag_id, run_id, patch_body, response_b {"note": "new_note2", "state": "failed"}, 200, ), - ({"update_mask": ["note"]}, {}, {"state": "success", "note": "test_note"}, 200), + ({"update_mask": ["note"]}, {}, {"state": "success", "note": None}, 200), ( {"update_mask": ["random"]}, {"state": DagRunState.FAILED}, From b51c02bbbbdc1fd74dcc169eb28bcac220b47721 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 8 Nov 2024 05:41:38 +0530 Subject: [PATCH 11/14] fix --- airflow/api_fastapi/core_api/routes/public/dag_run.py | 2 +- tests/api_fastapi/core_api/routes/public/test_dag_run.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index bb335c82b76d9..75d225fea1e32 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -123,7 +123,7 @@ async def patch_dag_run( if update_mask: data = patch_body.model_dump(include=set(update_mask)) else: - data = patch_body.model_dump(exclude_unset=True) + data = patch_body.model_dump() for attr_name, attr_value in data.items(): if attr_name == "state": diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 4cc260d7bc586..878ca6c2d14d0 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -152,8 +152,8 @@ class TestPatchDagRun: ( DAG1_ID, DAG1_RUN1_ID, - {"state": DagRunState.FAILED}, - {"state": DagRunState.FAILED, "note": DAG1_RUN1_NOTE}, + {"state": DagRunState.FAILED, "note": "new_note2"}, + {"state": DagRunState.FAILED, "note": "new_note2"}, ), ( DAG1_ID, From 4710df205169f85bcec6e423d046d015b1aa1677 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 8 Nov 2024 05:45:31 +0530 Subject: [PATCH 12/14] remove async --- airflow/api_fastapi/core_api/routes/public/dag_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 75d225fea1e32..2a4976a369ca2 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -99,7 +99,7 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio ] ), ) -async def patch_dag_run( +def patch_dag_run( dag_id: str, dag_run_id: str, patch_body: DAGRunPatchBody, From e31af72becbe11b138abb5a3f7c653e387a6e44c Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 8 Nov 2024 05:50:57 +0530 Subject: [PATCH 13/14] undo async --- airflow/api_fastapi/core_api/routes/public/dag_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 2a4976a369ca2..75d225fea1e32 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -99,7 +99,7 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio ] ), ) -def patch_dag_run( +async def patch_dag_run( dag_id: str, dag_run_id: str, patch_body: DAGRunPatchBody, From 8711b00b60c6049b7a8ee661564dd44004bd23c3 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 8 Nov 2024 17:57:21 +0530 Subject: [PATCH 14/14] fix --- airflow/api_fastapi/core_api/routes/public/dag_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index a3160b398a4b4..7778d7778fa17 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -144,6 +144,6 @@ def patch_dag_run( else: dag_run.dag_run_note.content = attr_value - session.refresh(dag_run) + dag_run = session.get(DagRun, dag_run.id) return DAGRunResponse.model_validate(dag_run, from_attributes=True)