From ed49619902c5c2d0d502b7cbeede928bb4d55160 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Wed, 20 Nov 2024 09:12:44 +0530 Subject: [PATCH 01/15] init --- .../endpoints/dag_run_endpoint.py | 1 + .../core_api/datamodels/dag_run.py | 11 ++ .../core_api/openapi/v1-generated.yaml | 106 ++++++++++++++++++ .../core_api/routes/public/dag_run.py | 18 +++ airflow/ui/openapi-gen/queries/common.ts | 3 + airflow/ui/openapi-gen/queries/queries.ts | 44 ++++++++ .../ui/openapi-gen/requests/schemas.gen.ts | 85 ++++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 33 ++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 56 +++++++++ 9 files changed, 357 insertions(+) diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index b8e7f36d1fd43..a9b8bc6273260 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -302,6 +302,7 @@ def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse: return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_runs, total_entries=total_entries)) +@mark_fastapi_migration_done @security.requires_access_dag("POST", DagAccessEntity.RUN) @action_logging @provide_session diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 8241885aff2fe..13582ba00c583 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -65,3 +65,14 @@ class DAGRunResponse(BaseModel): triggered_by: DagRunTriggeredByType conf: dict note: str | None + + +class TriggerDAGRunPostBody(BaseModel): + """Trigger DAG Run Serializer for POST body.""" + + dag_run_id: str | None + logical_date: datetime | None + data_interval_start: datetime | None + data_interval_end: datetime | None + conf: dict | None + note: str | None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 9b42d5c3a8145..9da7bd9f3b1a3 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1483,6 +1483,67 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns: + post: + tags: + - DagRun + summary: Trigger Dag Run + description: Trigger a DAG Run. + operationId: trigger_dag_run + parameters: + - name: dag_id + in: path + required: true + schema: + title: Dag Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/TriggerDAGRunPostBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '409': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Conflict + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dagSources/{file_token}: get: tags: @@ -6605,6 +6666,51 @@ components: - microseconds title: TimeDelta description: TimeDelta can be used to interact with datetime.timedelta objects. + TriggerDAGRunPostBody: + properties: + dag_run_id: + anyOf: + - type: string + - type: 'null' + title: Dag Run Id + logical_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date + data_interval_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Data Interval Start + data_interval_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Data Interval End + conf: + anyOf: + - type: object + - type: 'null' + title: Conf + note: + anyOf: + - type: string + - type: 'null' + title: Note + type: object + required: + - dag_run_id + - logical_date + - data_interval_start + - data_interval_end + - conf + - note + title: TriggerDAGRunPostBody + description: Trigger DAG Run Serializer for POST body. TriggerResponse: properties: id: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index decc7ff2b285c..222a08f4acce1 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -36,6 +36,7 @@ DAGRunPatchBody, DAGRunPatchStates, DAGRunResponse, + TriggerDAGRunPostBody, ) from airflow.api_fastapi.core_api.datamodels.task_instances import ( TaskInstanceCollectionResponse, @@ -228,3 +229,20 @@ def clear_dag_run( ) dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id)) return DAGRunResponse.model_validate(dag_run_cleared, from_attributes=True) + + +@dag_run_router.post( + "", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, + ] + ), +) +def trigger_dag_run( + dag_id, post_body: TriggerDAGRunPostBody, session: Annotated[Session, Depends(get_session)] +): + """Trigger a DAG Run.""" + pass diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 14377a81a968c..3a4baeaee882f 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1122,6 +1122,9 @@ export type ConnectionServiceTestConnectionMutationResult = Awaited< export type DagRunServiceClearDagRunMutationResult = Awaited< ReturnType >; +export type DagRunServiceTriggerDagRunMutationResult = Awaited< + ReturnType +>; export type PoolServicePostPoolMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index c461db254de2d..ea21c8868eec4 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -41,6 +41,7 @@ import { PoolPatchBody, PoolPostBody, TaskInstancesBatchBody, + TriggerDAGRunPostBody, VariableBody, } from "../requests/types.gen"; import * as Common from "./common"; @@ -1997,6 +1998,49 @@ export const useDagRunServiceClearDagRun = < }) as unknown as Promise, ...options, }); +/** + * Trigger Dag Run + * Trigger a DAG Run. + * @param data The data for the request. + * @param data.dagId + * @param data.requestBody + * @returns unknown Successful Response + * @throws ApiError + */ +export const useDagRunServiceTriggerDagRun = < + TData = Common.DagRunServiceTriggerDagRunMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + dagId: unknown; + requestBody: TriggerDAGRunPostBody; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + dagId: unknown; + requestBody: TriggerDAGRunPostBody; + }, + TContext + >({ + mutationFn: ({ dagId, requestBody }) => + DagRunService.triggerDagRun({ + dagId, + requestBody, + }) as unknown as Promise, + ...options, + }); /** * Post Pool * Create a Pool. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 1f83e434286b8..f53c7411d6584 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4002,6 +4002,91 @@ export const $TimeDelta = { "TimeDelta can be used to interact with datetime.timedelta objects.", } as const; +export const $TriggerDAGRunPostBody = { + properties: { + dag_run_id: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Dag Run Id", + }, + logical_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Logical Date", + }, + data_interval_start: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Data Interval Start", + }, + data_interval_end: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Data Interval End", + }, + conf: { + anyOf: [ + { + type: "object", + }, + { + type: "null", + }, + ], + title: "Conf", + }, + note: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Note", + }, + }, + type: "object", + required: [ + "dag_run_id", + "logical_date", + "data_interval_start", + "data_interval_end", + "conf", + "note", + ], + title: "TriggerDAGRunPostBody", + description: "Trigger DAG Run Serializer for POST body.", +} as const; + export const $TriggerResponse = { properties: { id: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 2d75d9811b127..fe6daa3842c1a 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -57,6 +57,8 @@ import type { GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, + TriggerDagRunData, + TriggerDagRunResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, @@ -924,6 +926,37 @@ export class DagRunService { }, }); } + + /** + * Trigger Dag Run + * Trigger a DAG Run. + * @param data The data for the request. + * @param data.dagId + * @param data.requestBody + * @returns unknown Successful Response + * @throws ApiError + */ + public static triggerDagRun( + data: TriggerDagRunData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "POST", + url: "/public/dags/{dag_id}/dagRuns", + path: { + dag_id: data.dagId, + }, + body: data.requestBody, + mediaType: "application/json", + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 409: "Conflict", + 422: "Validation Error", + }, + }); + } } export class DagSourceService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 85a8a7140874e..99ae7d9c1e01a 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -934,6 +934,20 @@ export type TimeDelta = { microseconds: number; }; +/** + * Trigger DAG Run Serializer for POST body. + */ +export type TriggerDAGRunPostBody = { + dag_run_id: string | null; + logical_date: string | null; + data_interval_start: string | null; + data_interval_end: string | null; + conf: { + [key: string]: unknown; + } | null; + note: string | null; +}; + /** * Trigger serializer for responses. */ @@ -1224,6 +1238,13 @@ export type ClearDagRunResponse = | TaskInstanceCollectionResponse | DAGRunResponse; +export type TriggerDagRunData = { + dagId: unknown; + requestBody: TriggerDAGRunPostBody; +}; + +export type TriggerDagRunResponse = unknown; + export type GetDagSourceData = { accept?: string; fileToken: string; @@ -2263,6 +2284,41 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns": { + post: { + req: TriggerDagRunData; + res: { + /** + * Successful Response + */ + 200: unknown; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Conflict + */ + 409: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dagSources/{file_token}": { get: { req: GetDagSourceData; From 028c9e582e0b6800a54a94ba98c732fb76e93c54 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Wed, 20 Nov 2024 10:37:12 +0530 Subject: [PATCH 02/15] wip --- .../core_api/datamodels/dag_run.py | 2 +- .../core_api/openapi/v1-generated.yaml | 8 +- .../core_api/routes/public/dag_run.py | 74 +++++++++++++++++-- airflow/ui/openapi-gen/queries/queries.ts | 2 +- .../ui/openapi-gen/requests/schemas.gen.ts | 11 +-- .../ui/openapi-gen/requests/services.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- 7 files changed, 78 insertions(+), 23 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 13582ba00c583..ee2da3fb6ae6e 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -71,7 +71,7 @@ class TriggerDAGRunPostBody(BaseModel): """Trigger DAG Run Serializer for POST body.""" dag_run_id: str | None - logical_date: datetime | None + logical_date: datetime data_interval_start: datetime | None data_interval_end: datetime | None conf: dict | None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 9da7bd9f3b1a3..4ed6f0c977c6b 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1488,7 +1488,7 @@ paths: tags: - DagRun summary: Trigger Dag Run - description: Trigger a DAG Run. + description: Trigger a DAG. operationId: trigger_dag_run parameters: - name: dag_id @@ -6674,10 +6674,8 @@ components: - type: 'null' title: Dag Run Id logical_date: - anyOf: - - type: string - format: date-time - - type: 'null' + type: string + format: date-time title: Logical Date data_interval_start: anyOf: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 222a08f4acce1..7a2779800f543 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -19,8 +19,9 @@ from typing import Annotated +import pendulum from fastapi import Depends, HTTPException, Query, Request, status -from sqlalchemy import select +from sqlalchemy import or_, select from sqlalchemy.orm import Session from airflow.api.common.mark_tasks import ( @@ -43,7 +44,11 @@ TaskInstanceResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.models import DAG, DagRun +from airflow.models import DAG, DagModel, DagRun +from airflow.models.dag_version import DagVersion +from airflow.timetables.base import DataInterval +from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunTriggeredByType, DagRunType dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns") @@ -242,7 +247,66 @@ def clear_dag_run( ), ) def trigger_dag_run( - dag_id, post_body: TriggerDAGRunPostBody, session: Annotated[Session, Depends(get_session)] + dag_id, body: TriggerDAGRunPostBody, request: Request, session: Annotated[Session, Depends(get_session)] ): - """Trigger a DAG Run.""" - pass + """Trigger a DAG.""" + dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1)) + if not dm: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found") + + if dm.has_import_errors: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", + ) + + logical_date = pendulum.instance(body.logical_date) + run_id = body.dag_run_id + dagrun_instance = session.scalar( + select(DagRun) + .where( + DagRun.dag_id == dag_id, + or_(DagRun.run_id == run_id, DagRun.logical_date == logical_date), + ) + .limit(1) + ) + if not dagrun_instance: + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + + if body.data_interval_start and body.data_interval_end: + data_interval = DataInterval( + start=pendulum.instance(body.data_interval_start), + end=pendulum.instance(body.data_interval_end), + ) + else: + data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) + dag_version = DagVersion.get_latest_version(dag.dag_id) + dag_run = dag.create_dagrun( + run_type=DagRunType.MANUAL, + run_id=run_id, + logical_date=logical_date, + data_interval=data_interval, + state=DagRunState.QUEUED, + conf=body.conf, + external_trigger=True, + dag_version=dag_version, + session=session, + triggered_by=DagRunTriggeredByType.REST_API, + ) + dag_run_note = body.note + if dag_run_note: + current_user_id = None # refer to https://github.com/apache/airflow/issues/43534 + dag_run.note = (dag_run_note, current_user_id) + return DAGRunResponse.model_validate(dag_run, from_attributes=True) + + if dagrun_instance.logical_date == logical_date: + raise HTTPException( + status.HTTP_409_CONFLICT, + f"DAGRun with DAG ID: '{dag_id}' and " + f"DAGRun logical date: '{logical_date.isoformat(sep=' ')}' already exists", + ) + + raise HTTPException( + status.HTTP_409_CONFLICT, + f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{body.dag_run_id}' already exists", + ) diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index ea21c8868eec4..fddb167080809 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2000,7 +2000,7 @@ export const useDagRunServiceClearDagRun = < }); /** * Trigger Dag Run - * Trigger a DAG Run. + * Trigger a DAG. * @param data The data for the request. * @param data.dagId * @param data.requestBody diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index f53c7411d6584..ecf1f3045b7b9 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4016,15 +4016,8 @@ export const $TriggerDAGRunPostBody = { title: "Dag Run Id", }, logical_date: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], + type: "string", + format: "date-time", title: "Logical Date", }, data_interval_start: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index fe6daa3842c1a..b0515a6d54238 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -929,7 +929,7 @@ export class DagRunService { /** * Trigger Dag Run - * Trigger a DAG Run. + * Trigger a DAG. * @param data The data for the request. * @param data.dagId * @param data.requestBody diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 99ae7d9c1e01a..a677de3d1afdc 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -939,7 +939,7 @@ export type TimeDelta = { */ export type TriggerDAGRunPostBody = { dag_run_id: string | null; - logical_date: string | null; + logical_date: string; data_interval_start: string | null; data_interval_end: string | null; conf: { From 1a3a96cac7ab3996bc1580ec3dfef20f5c753a66 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Thu, 21 Nov 2024 23:31:57 +0530 Subject: [PATCH 03/15] remove logical_date --- .../api_fastapi/core_api/datamodels/dag_run.py | 3 +-- .../core_api/openapi/v1-generated.yaml | 9 +-------- .../core_api/routes/public/dag_run.py | 16 +++++----------- airflow/ui/openapi-gen/requests/schemas.gen.ts | 15 +-------------- airflow/ui/openapi-gen/requests/types.gen.ts | 3 +-- 5 files changed, 9 insertions(+), 37 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index eeed9c014215e..e7d05a356d21f 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -77,8 +77,7 @@ class DAGRunCollectionResponse(BaseModel): class TriggerDAGRunPostBody(BaseModel): """Trigger DAG Run Serializer for POST body.""" - dag_run_id: str | None - logical_date: datetime + dag_run_id: str data_interval_start: datetime | None data_interval_end: datetime | None conf: dict | None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index f24f1db58a1a7..59c0545a24d1d 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -7514,14 +7514,8 @@ components: TriggerDAGRunPostBody: properties: dag_run_id: - anyOf: - - type: string - - type: 'null' - title: Dag Run Id - logical_date: type: string - format: date-time - title: Logical Date + title: Dag Run Id data_interval_start: anyOf: - type: string @@ -7547,7 +7541,6 @@ components: type: object required: - dag_run_id - - logical_date - data_interval_start - data_interval_end - conf diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 75b1ae35c14a0..571c228ccd693 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -56,6 +56,7 @@ from airflow.models import DAG, DagModel, DagRun from airflow.models.dag_version import DagVersion from airflow.timetables.base import DataInterval +from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -333,16 +334,16 @@ def trigger_dag_run( f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", ) - logical_date = pendulum.instance(body.logical_date) run_id = body.dag_run_id dagrun_instance = session.scalar( select(DagRun) .where( DagRun.dag_id == dag_id, - or_(DagRun.run_id == run_id, DagRun.logical_date == logical_date), + or_(DagRun.run_id == run_id), ) .limit(1) ) + if not dagrun_instance: dag: DAG = request.app.state.dag_bag.get_dag(dag_id) @@ -352,12 +353,12 @@ def trigger_dag_run( end=pendulum.instance(body.data_interval_end), ) else: - data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) + now = pendulum.instance(timezone.utcnow()) + data_interval = dag.timetable.infer_manual_data_interval(run_after=now) dag_version = DagVersion.get_latest_version(dag.dag_id) dag_run = dag.create_dagrun( run_type=DagRunType.MANUAL, run_id=run_id, - logical_date=logical_date, data_interval=data_interval, state=DagRunState.QUEUED, conf=body.conf, @@ -372,13 +373,6 @@ def trigger_dag_run( dag_run.note = (dag_run_note, current_user_id) return DAGRunResponse.model_validate(dag_run, from_attributes=True) - if dagrun_instance.logical_date == logical_date: - raise HTTPException( - status.HTTP_409_CONFLICT, - f"DAGRun with DAG ID: '{dag_id}' and " - f"DAGRun logical date: '{logical_date.isoformat(sep=' ')}' already exists", - ) - raise HTTPException( status.HTTP_409_CONFLICT, f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{body.dag_run_id}' already exists", diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index ebf18842fbc58..b825154550315 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4316,20 +4316,8 @@ export const $TimeDelta = { export const $TriggerDAGRunPostBody = { properties: { dag_run_id: { - anyOf: [ - { - type: "string", - }, - { - type: "null", - }, - ], - title: "Dag Run Id", - }, - logical_date: { type: "string", - format: "date-time", - title: "Logical Date", + title: "Dag Run Id", }, data_interval_start: { anyOf: [ @@ -4381,7 +4369,6 @@ export const $TriggerDAGRunPostBody = { type: "object", required: [ "dag_run_id", - "logical_date", "data_interval_start", "data_interval_end", "conf", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 3cbfc07cf2619..3778f0d1bc2f8 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -999,8 +999,7 @@ export type TimeDelta = { * Trigger DAG Run Serializer for POST body. */ export type TriggerDAGRunPostBody = { - dag_run_id: string | null; - logical_date: string; + dag_run_id: string; data_interval_start: string | null; data_interval_end: string | null; conf: { From faab1d4780bf287a885dd5b2b752f1ce96d2a9ac Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 22 Nov 2024 09:00:35 +0530 Subject: [PATCH 04/15] fix trigger dag_run --- .../core_api/datamodels/dag_run.py | 36 ++++++++++++++++--- .../core_api/openapi/v1-generated.yaml | 14 +++++--- .../core_api/routes/public/dag_run.py | 14 +++----- .../ui/openapi-gen/requests/schemas.gen.ts | 29 ++++++++++----- airflow/ui/openapi-gen/requests/types.gen.ts | 9 ++--- .../core_api/routes/public/test_dag_run.py | 4 +++ 6 files changed, 74 insertions(+), 32 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index e7d05a356d21f..8a1c8174fc89d 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -20,8 +20,10 @@ from datetime import datetime from enum import Enum -from pydantic import BaseModel, Field +from pydantic import AwareDatetime, BaseModel, Field, computed_field, model_validator +from airflow.models import DagRun +from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -77,8 +79,32 @@ class DAGRunCollectionResponse(BaseModel): class TriggerDAGRunPostBody(BaseModel): """Trigger DAG Run Serializer for POST body.""" - dag_run_id: str - data_interval_start: datetime | None - data_interval_end: datetime | None - conf: dict | None + dag_run_id: str | None = None + logical_date: AwareDatetime | None = None + data_interval_start: AwareDatetime | None = None + data_interval_end: AwareDatetime | None = None + + conf: dict | None = Field(default_factory=dict) note: str | None + + @model_validator(mode="after") + def check_data_intervals(cls, values): + data_interval_start = values.get("data_interval_start") + data_interval_end = values.get("data_interval_end") + if (data_interval_start is None) != (data_interval_end is None): + raise ValueError( + "Either both data_interval_start and data_interval_end must be provided or both must be None" + ) + return values + + @model_validator(mode="after") + def validate_dag_run_id(self): + if not self.dag_run_id: + self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.logical_date) + return self + + # Mypy issue https://github.com/python/mypy/issues/1362 + @computed_field # type: ignore[misc] + @property + def _logical_date(self) -> datetime: + return self.logical_date or timezone.utcnow() diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 59c0545a24d1d..48da3d3df7f3a 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -7514,8 +7514,16 @@ components: TriggerDAGRunPostBody: properties: dag_run_id: - type: string + anyOf: + - type: string + - type: 'null' title: Dag Run Id + logical_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date data_interval_start: anyOf: - type: string @@ -7540,10 +7548,6 @@ components: title: Note type: object required: - - dag_run_id - - data_interval_start - - data_interval_end - - conf - note title: TriggerDAGRunPostBody description: Trigger DAG Run Serializer for POST body. diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 571c228ccd693..219873788da51 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -21,7 +21,7 @@ import pendulum from fastapi import Depends, HTTPException, Query, Request, status -from sqlalchemy import or_, select +from sqlalchemy import select from sqlalchemy.orm import Session from airflow.api.common.mark_tasks import ( @@ -56,7 +56,6 @@ from airflow.models import DAG, DagModel, DagRun from airflow.models.dag_version import DagVersion from airflow.timetables.base import DataInterval -from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -336,12 +335,7 @@ def trigger_dag_run( run_id = body.dag_run_id dagrun_instance = session.scalar( - select(DagRun) - .where( - DagRun.dag_id == dag_id, - or_(DagRun.run_id == run_id), - ) - .limit(1) + select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == run_id).limit(1) ) if not dagrun_instance: @@ -353,8 +347,8 @@ def trigger_dag_run( end=pendulum.instance(body.data_interval_end), ) else: - now = pendulum.instance(timezone.utcnow()) - data_interval = dag.timetable.infer_manual_data_interval(run_after=now) + logical_date = pendulum.instance(body._logical_date) + data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) dag_version = DagVersion.get_latest_version(dag.dag_id) dag_run = dag.create_dagrun( run_type=DagRunType.MANUAL, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index b825154550315..3137f2d54c283 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4316,9 +4316,28 @@ export const $TimeDelta = { export const $TriggerDAGRunPostBody = { properties: { dag_run_id: { - type: "string", + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], title: "Dag Run Id", }, + logical_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Logical Date", + }, data_interval_start: { anyOf: [ { @@ -4367,13 +4386,7 @@ export const $TriggerDAGRunPostBody = { }, }, type: "object", - required: [ - "dag_run_id", - "data_interval_start", - "data_interval_end", - "conf", - "note", - ], + required: ["note"], title: "TriggerDAGRunPostBody", description: "Trigger DAG Run Serializer for POST body.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 3778f0d1bc2f8..5dadef19d1d9a 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -999,10 +999,11 @@ export type TimeDelta = { * Trigger DAG Run Serializer for POST body. */ export type TriggerDAGRunPostBody = { - dag_run_id: string; - data_interval_start: string | null; - data_interval_end: string | null; - conf: { + dag_run_id?: string | null; + logical_date?: string | null; + data_interval_start?: string | null; + data_interval_end?: string | null; + conf?: { [key: string]: unknown; } | null; note: string | null; diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index b3ce267bf5222..7bb3055abe4aa 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -652,3 +652,7 @@ def test_clear_dag_run_unprocessable_entity(self, test_client): body = response.json() assert body["detail"][0]["msg"] == "Field required" assert body["detail"][0]["loc"][0] == "body" + + +class TestTriggerDagRun: + pass From b07d3b37dbc82137aeef2454f9de21ee029b7062 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 22 Nov 2024 12:30:03 +0530 Subject: [PATCH 05/15] tests WIP --- .../core_api/datamodels/dag_run.py | 4 +- .../core_api/routes/public/dag_run.py | 3 +- .../core_api/routes/public/test_dag_run.py | 76 ++++++++++++++++++- 3 files changed, 76 insertions(+), 7 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 8a1c8174fc89d..1c62734e7556b 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -89,9 +89,7 @@ class TriggerDAGRunPostBody(BaseModel): @model_validator(mode="after") def check_data_intervals(cls, values): - data_interval_start = values.get("data_interval_start") - data_interval_end = values.get("data_interval_end") - if (data_interval_start is None) != (data_interval_end is None): + if (values.data_interval_start is None) != (values.data_interval_end is None): raise ValueError( "Either both data_interval_start and data_interval_end must be provided or both must be None" ) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 219873788da51..c2298d67f9488 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -334,6 +334,7 @@ def trigger_dag_run( ) run_id = body.dag_run_id + logical_date = pendulum.instance(body._logical_date) dagrun_instance = session.scalar( select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == run_id).limit(1) ) @@ -347,12 +348,12 @@ def trigger_dag_run( end=pendulum.instance(body.data_interval_end), ) else: - logical_date = pendulum.instance(body._logical_date) data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) dag_version = DagVersion.get_latest_version(dag.dag_id) dag_run = dag.create_dagrun( run_type=DagRunType.MANUAL, run_id=run_id, + logical_date=logical_date, data_interval=data_interval, state=DagRunState.QUEUED, conf=body.conf, diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 7bb3055abe4aa..4abc790c4be43 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -17,15 +17,17 @@ from __future__ import annotations -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta import pytest +import time_machine from sqlalchemy import select from airflow.models import DagRun from airflow.models.asset import AssetEvent, AssetModel from airflow.operators.empty import EmptyOperator from airflow.sdk.definitions.asset import Asset +from airflow.utils import timezone from airflow.utils.session import provide_session from airflow.utils.state import DagRunState, State from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -63,11 +65,14 @@ @pytest.fixture(autouse=True) @provide_session -def setup(dag_maker, session=None): +def setup(request, dag_maker, session=None): clear_db_runs() clear_db_dags() clear_db_serialized_dags() + if "no_setup" in request.keywords: + return + with dag_maker( DAG1_ID, schedule="@daily", @@ -654,5 +659,70 @@ def test_clear_dag_run_unprocessable_entity(self, test_client): assert body["detail"][0]["loc"][0] == "body" +# @pytest.mark.no_setup class TestTriggerDagRun: - pass + @time_machine.travel(timezone.utcnow(), tick=False) + @pytest.mark.parametrize( + "dag_run_id, logical_date, note, data_interval_start, data_interval_end", + [ + ("dag_run_5", LOGICAL_DATE1, "test_note", LOGICAL_DATE1, LOGICAL_DATE2 + timedelta(days=1)), + ], + ) + def test_should_respond_200( + self, + test_client, + session, + dag_run_id, + logical_date, + note, + data_interval_start, + data_interval_end, + ): + response = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns", + json={ + "dag_run_id": dag_run_id, + "logical_date": logical_date.isoformat(), + "note": note, + "data_interval_start": data_interval_start.isoformat(), + "data_interval_end": data_interval_end.isoformat(), + }, + ) + + assert response.status_code == 200 + now = timezone.utcnow().isoformat().replace("+00:00", "Z") + if logical_date is None: + logical_date = now + else: + logical_date = logical_date.isoformat().replace("+00:00", "Z") + + if dag_run_id is None: + dag_run_id = f"manual__{logical_date}" + + if data_interval_end is None and data_interval_start is None: + data_interval_end = now + data_interval_start = now + else: + data_interval_end = data_interval_end.isoformat().replace("+00:00", "Z") + data_interval_start = data_interval_start.isoformat().replace("+00:00", "Z") + + expected_response_json = { + "conf": {}, + "dag_id": DAG1_ID, + "run_id": dag_run_id, + "end_date": None, + "logical_date": logical_date, + "external_trigger": True, + "start_date": None, + "state": "queued", + "data_interval_end": data_interval_end, + "data_interval_start": data_interval_start, + "queued_at": now, + "last_scheduling_decision": None, + "run_type": "manual", + "note": note, + "triggered_by": "rest_api", + } + + assert response.json() == expected_response_json + # _check_last_log(session, dag_id=DAG1_ID, event="api.post_dag_run", logical_date=None) From c426fcb3e124c6df7f63f85d95c927e808cb6362 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 22 Nov 2024 22:13:38 +0530 Subject: [PATCH 06/15] working tests --- .../core_api/datamodels/dag_run.py | 12 +- .../core_api/openapi/v1-generated.yaml | 3 +- .../core_api/routes/public/dag_run.py | 55 ++-- .../ui/openapi-gen/requests/schemas.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- .../core_api/routes/public/test_dag_run.py | 288 ++++++++++++++++-- 6 files changed, 304 insertions(+), 58 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 1c62734e7556b..5e7eb006875a8 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -20,6 +20,7 @@ from datetime import datetime from enum import Enum +from fastapi import HTTPException, status from pydantic import AwareDatetime, BaseModel, Field, computed_field, model_validator from airflow.models import DagRun @@ -85,20 +86,23 @@ class TriggerDAGRunPostBody(BaseModel): data_interval_end: AwareDatetime | None = None conf: dict | None = Field(default_factory=dict) - note: str | None + note: str | None = None + + model_config = {"extra": "forbid"} @model_validator(mode="after") def check_data_intervals(cls, values): if (values.data_interval_start is None) != (values.data_interval_end is None): - raise ValueError( - "Either both data_interval_start and data_interval_end must be provided or both must be None" + raise HTTPException( + status.HTTP_422_UNPROCESSABLE_ENTITY, + "Either both data_interval_start and data_interval_end must be provided or both must be None", ) return values @model_validator(mode="after") def validate_dag_run_id(self): if not self.dag_run_id: - self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.logical_date) + self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self._logical_date) return self # Mypy issue https://github.com/python/mypy/issues/1362 diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 48da3d3df7f3a..6f99c923ce28e 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -7546,9 +7546,8 @@ components: - type: string - type: 'null' title: Note + additionalProperties: false type: object - required: - - note title: TriggerDAGRunPostBody description: Trigger DAG Run Serializer for POST body. TriggerResponse: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index c2298d67f9488..96b8eccc5eec5 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -340,33 +340,36 @@ def trigger_dag_run( ) if not dagrun_instance: - dag: DAG = request.app.state.dag_bag.get_dag(dag_id) - - if body.data_interval_start and body.data_interval_end: - data_interval = DataInterval( - start=pendulum.instance(body.data_interval_start), - end=pendulum.instance(body.data_interval_end), + try: + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + + if body.data_interval_start and body.data_interval_end: + data_interval = DataInterval( + start=pendulum.instance(body.data_interval_start), + end=pendulum.instance(body.data_interval_end), + ) + else: + data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) + dag_version = DagVersion.get_latest_version(dag.dag_id) + dag_run = dag.create_dagrun( + run_type=DagRunType.MANUAL, + run_id=run_id, + logical_date=logical_date, + data_interval=data_interval, + state=DagRunState.QUEUED, + conf=body.conf, + external_trigger=True, + dag_version=dag_version, + session=session, + triggered_by=DagRunTriggeredByType.REST_API, ) - else: - data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) - dag_version = DagVersion.get_latest_version(dag.dag_id) - dag_run = dag.create_dagrun( - run_type=DagRunType.MANUAL, - run_id=run_id, - logical_date=logical_date, - data_interval=data_interval, - state=DagRunState.QUEUED, - conf=body.conf, - external_trigger=True, - dag_version=dag_version, - session=session, - triggered_by=DagRunTriggeredByType.REST_API, - ) - dag_run_note = body.note - if dag_run_note: - current_user_id = None # refer to https://github.com/apache/airflow/issues/43534 - dag_run.note = (dag_run_note, current_user_id) - return DAGRunResponse.model_validate(dag_run, from_attributes=True) + dag_run_note = body.note + if dag_run_note: + current_user_id = None # refer to https://github.com/apache/airflow/issues/43534 + dag_run.note = (dag_run_note, current_user_id) + return DAGRunResponse.model_validate(dag_run, from_attributes=True) + except ValueError as e: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) raise HTTPException( status.HTTP_409_CONFLICT, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 3137f2d54c283..e3664b2b96992 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4385,8 +4385,8 @@ export const $TriggerDAGRunPostBody = { title: "Note", }, }, + additionalProperties: false, type: "object", - required: ["note"], title: "TriggerDAGRunPostBody", description: "Trigger DAG Run Serializer for POST body.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 5dadef19d1d9a..061bdeaed8393 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1006,7 +1006,7 @@ export type TriggerDAGRunPostBody = { conf?: { [key: string]: unknown; } | null; - note: string | null; + note?: string | null; }; /** diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 4abc790c4be43..3376f51d3a2a5 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -18,12 +18,13 @@ from __future__ import annotations from datetime import datetime, timedelta +from unittest import mock import pytest import time_machine from sqlalchemy import select -from airflow.models import DagRun +from airflow.models import DagModel, DagRun from airflow.models.asset import AssetEvent, AssetModel from airflow.operators.empty import EmptyOperator from airflow.sdk.definitions.asset import Asset @@ -75,7 +76,7 @@ def setup(request, dag_maker, session=None): with dag_maker( DAG1_ID, - schedule="@daily", + schedule=None, start_date=START_DATE1, ): task1 = EmptyOperator(task_id="task_1") @@ -661,63 +662,108 @@ def test_clear_dag_run_unprocessable_entity(self, test_client): # @pytest.mark.no_setup class TestTriggerDagRun: + def _dags_for_trigger_tests(self, session=None): + inactive_dag = DagModel( + dag_id="inactive", + fileloc="/tmp/dag_del_1.py", + timetable_summary="2 2 * * *", + is_active=False, + is_paused=True, + owners="test_owner,another_test_owner", + next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + ) + + import_errors_dag = DagModel( + dag_id="import_errors", + fileloc="/tmp/dag_del_2.py", + timetable_summary="2 2 * * *", + is_active=True, + owners="test_owner,another_test_owner", + next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + ) + import_errors_dag.has_import_errors = True + + session.add(inactive_dag) + session.add(import_errors_dag) + session.commit() + @time_machine.travel(timezone.utcnow(), tick=False) @pytest.mark.parametrize( "dag_run_id, logical_date, note, data_interval_start, data_interval_end", [ - ("dag_run_5", LOGICAL_DATE1, "test_note", LOGICAL_DATE1, LOGICAL_DATE2 + timedelta(days=1)), + ("dag_run_5", "2020-06-11T18:00:00+00:00", "test-note", None, None), + ( + "dag_run_6", + "2024-06-11T18:00:00+00:00", + "test-note", + "2024-01-03T00:00:00+00:00", + "2024-01-04T05:00:00+00:00", + ), + (None, "2020-06-11T18:00:00+00:00", None, None, None), + (None, None, None, None, None), ], ) def test_should_respond_200( self, test_client, - session, dag_run_id, logical_date, note, data_interval_start, data_interval_end, ): + fixed_now = timezone.utcnow().isoformat() + + request_json = {"note": note} + if logical_date is not None: + request_json["logical_date"] = logical_date + if dag_run_id is not None: + request_json["dag_run_id"] = dag_run_id + if data_interval_start is not None: + request_json["data_interval_start"] = data_interval_start + if data_interval_end is not None: + request_json["data_interval_end"] = data_interval_end + response = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", json={ "dag_run_id": dag_run_id, - "logical_date": logical_date.isoformat(), + "logical_date": logical_date, "note": note, - "data_interval_start": data_interval_start.isoformat(), - "data_interval_end": data_interval_end.isoformat(), + "data_interval_start": data_interval_start, + "data_interval_end": data_interval_end, }, ) assert response.status_code == 200 - now = timezone.utcnow().isoformat().replace("+00:00", "Z") + if logical_date is None: - logical_date = now + expected_logical_date = fixed_now else: - logical_date = logical_date.isoformat().replace("+00:00", "Z") - + expected_logical_date = logical_date if dag_run_id is None: - dag_run_id = f"manual__{logical_date}" - - if data_interval_end is None and data_interval_start is None: - data_interval_end = now - data_interval_start = now + expected_dag_run_id = f"manual__{expected_logical_date}" else: - data_interval_end = data_interval_end.isoformat().replace("+00:00", "Z") - data_interval_start = data_interval_start.isoformat().replace("+00:00", "Z") + expected_dag_run_id = dag_run_id + + expected_data_interval_start = expected_logical_date.replace("+00:00", "Z") + expected_data_interval_end = expected_logical_date.replace("+00:00", "Z") + if data_interval_start is not None and data_interval_end is not None: + expected_data_interval_start = data_interval_start.replace("+00:00", "Z") + expected_data_interval_end = data_interval_end.replace("+00:00", "Z") expected_response_json = { "conf": {}, "dag_id": DAG1_ID, - "run_id": dag_run_id, + "run_id": expected_dag_run_id, "end_date": None, - "logical_date": logical_date, + "logical_date": expected_logical_date.replace("+00:00", "Z"), "external_trigger": True, "start_date": None, "state": "queued", - "data_interval_end": data_interval_end, - "data_interval_start": data_interval_start, - "queued_at": now, + "data_interval_end": expected_data_interval_end, + "data_interval_start": expected_data_interval_start, + "queued_at": fixed_now.replace("+00:00", "Z"), "last_scheduling_decision": None, "run_type": "manual", "note": note, @@ -725,4 +771,198 @@ def test_should_respond_200( } assert response.json() == expected_response_json - # _check_last_log(session, dag_id=DAG1_ID, event="api.post_dag_run", logical_date=None) + + @pytest.mark.parametrize( + "post_body, expected_detail", + [ + ( + {"executiondate": "2020-11-10T08:25:56Z"}, + { + "detail": [ + { + "input": "2020-11-10T08:25:56Z", + "loc": ["body", "executiondate"], + "msg": "Extra inputs are not permitted", + "type": "extra_forbidden", + } + ] + }, + ), + ( + {"logical_date": "2020-11-10T08:25:56"}, + { + "detail": [ + { + "input": "2020-11-10T08:25:56", + "loc": ["body", "logical_date"], + "msg": "Input should have timezone info", + "type": "timezone_aware", + } + ] + }, + ), + ( + {"data_interval_start": "2020-11-10T08:25:56"}, + { + "detail": [ + { + "input": "2020-11-10T08:25:56", + "loc": ["body", "data_interval_start"], + "msg": "Input should have timezone info", + "type": "timezone_aware", + } + ] + }, + ), + ( + {"data_interval_end": "2020-11-10T08:25:56"}, + { + "detail": [ + { + "input": "2020-11-10T08:25:56", + "loc": ["body", "data_interval_end"], + "msg": "Input should have timezone info", + "type": "timezone_aware", + } + ] + }, + ), + ( + {"dag_run_id": 20}, + { + "detail": [ + { + "input": 20, + "loc": ["body", "dag_run_id"], + "msg": "Input should be a valid string", + "type": "string_type", + } + ] + }, + ), + ( + {"note": 20}, + { + "detail": [ + { + "input": 20, + "loc": ["body", "note"], + "msg": "Input should be a valid string", + "type": "string_type", + } + ] + }, + ), + ( + {"conf": 20}, + { + "detail": [ + { + "input": 20, + "loc": ["body", "conf"], + "msg": "Input should be a valid dictionary", + "type": "dict_type", + } + ] + }, + ), + ], + ) + def test_invalid_data(self, test_client, post_body, expected_detail): + response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json=post_body) + assert response.status_code == 422 + assert response.json() == expected_detail + + @mock.patch("airflow.models.DAG.create_dagrun") + def test_dagrun_creation_exception_is_handled(self, mock_create_dagrun, test_client): + error_message = "Encountered Error" + + mock_create_dagrun.side_effect = ValueError(error_message) + + response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={}) + assert response.status_code == 400 + assert response.json() == {"detail": error_message} + + def test_should_respond_404_if_a_dag_is_inactive(self, test_client, session): + self._dags_for_trigger_tests(session) + response = test_client.post("/public/dags/inactive/dagRuns", json={}) + assert response.status_code == 404 + assert response.json()["detail"] == "DAG with dag_id: 'inactive' not found" + + def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, session): + self._dags_for_trigger_tests(session) + response = test_client.post("/public/dags/import_errors/dagRuns", json={}) + assert response.status_code == 400 + assert ( + response.json()["detail"] + == "DAG with dag_id: 'import_errors' has import errors and cannot be triggered" + ) + + @time_machine.travel(timezone.utcnow(), tick=False) + def test_should_response_200_for_duplicate_logical_date(self, test_client): + RUN_ID = "random_1" + logical_date = LOGICAL_DATE1.isoformat().replace("+00:00", "Z") + note = "duplicate logical date test" + response = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns", + json={"logical_date": logical_date, "dag_run_id": RUN_ID, "note": note}, + ) + now = timezone.utcnow().isoformat().replace("+00:00", "Z") + assert response.status_code == 200 + body = response.json() + assert body == { + "run_id": RUN_ID, + "dag_id": DAG1_ID, + "logical_date": logical_date, + "queued_at": now, + "start_date": None, + "end_date": None, + "data_interval_start": logical_date, + "data_interval_end": logical_date, + "last_scheduling_decision": None, + "run_type": "manual", + "state": "queued", + "external_trigger": True, + "triggered_by": "rest_api", + "conf": {}, + "note": note, + } + + @pytest.mark.parametrize( + "data_interval_start, data_interval_end", + [ + ( + LOGICAL_DATE1.isoformat(), + None, + ), + ( + None, + LOGICAL_DATE1.isoformat(), + ), + ], + ) + def test_should_response_422_for_missing_start_date_or_end_date( + self, test_client, data_interval_start, data_interval_end + ): + response = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns", + json={"data_interval_start": data_interval_start, "data_interval_end": data_interval_end}, + ) + assert response.status_code == 422 + assert ( + response.json()["detail"] + == "Either both data_interval_start and data_interval_end must be provided or both must be None" + ) + + def test_response_404(self, test_client): + response = test_client.post("/public/dags/randoms/dagRuns", json={}) + assert response.status_code == 404 + assert response.json()["detail"] == "DAG with dag_id: 'randoms' not found" + + def test_response_409(self, test_client): + response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID}) + assert response.status_code == 409 + assert ( + response.json()["detail"] + == "DAGRun with DAG ID: 'test_dag1' and DAGRun ID: 'dag_run_1' already exists" + ) From c8ee434dc1dd4bd0161c89fd30c7a120f73c76fb Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 22 Nov 2024 22:18:42 +0530 Subject: [PATCH 07/15] remove logical_date from post body --- airflow/api_fastapi/core_api/datamodels/dag_run.py | 7 +++---- .../api_fastapi/core_api/openapi/v1-generated.yaml | 6 ------ airflow/ui/openapi-gen/requests/schemas.gen.ts | 12 ------------ airflow/ui/openapi-gen/requests/types.gen.ts | 1 - 4 files changed, 3 insertions(+), 23 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 5e7eb006875a8..c3261ce90e3b1 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -81,7 +81,6 @@ class TriggerDAGRunPostBody(BaseModel): """Trigger DAG Run Serializer for POST body.""" dag_run_id: str | None = None - logical_date: AwareDatetime | None = None data_interval_start: AwareDatetime | None = None data_interval_end: AwareDatetime | None = None @@ -102,11 +101,11 @@ def check_data_intervals(cls, values): @model_validator(mode="after") def validate_dag_run_id(self): if not self.dag_run_id: - self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self._logical_date) + self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.logical_date) return self # Mypy issue https://github.com/python/mypy/issues/1362 @computed_field # type: ignore[misc] @property - def _logical_date(self) -> datetime: - return self.logical_date or timezone.utcnow() + def logical_date(self) -> datetime: + return timezone.utcnow() diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 6f99c923ce28e..0f20663246642 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -7518,12 +7518,6 @@ components: - type: string - type: 'null' title: Dag Run Id - logical_date: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Logical Date data_interval_start: anyOf: - type: string diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index e3664b2b96992..3e33cbfae58a5 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4326,18 +4326,6 @@ export const $TriggerDAGRunPostBody = { ], title: "Dag Run Id", }, - logical_date: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Logical Date", - }, data_interval_start: { anyOf: [ { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 061bdeaed8393..0fd4860de4f54 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1000,7 +1000,6 @@ export type TimeDelta = { */ export type TriggerDAGRunPostBody = { dag_run_id?: string | null; - logical_date?: string | null; data_interval_start?: string | null; data_interval_end?: string | null; conf?: { From e6964b87d530bbf1b3cfe58fcf9cb3fcf3de443b Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 22 Nov 2024 22:34:03 +0530 Subject: [PATCH 08/15] remove logical_date from tests --- .../core_api/routes/public/dag_run.py | 2 +- .../core_api/routes/public/test_dag_run.py | 85 +++++++++---------- 2 files changed, 42 insertions(+), 45 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 96b8eccc5eec5..c4741d296cfb2 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -334,7 +334,7 @@ def trigger_dag_run( ) run_id = body.dag_run_id - logical_date = pendulum.instance(body._logical_date) + logical_date = pendulum.instance(body.logical_date) dagrun_instance = session.scalar( select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == run_id).limit(1) ) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 3376f51d3a2a5..cc9dffb95d192 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -689,25 +689,22 @@ def _dags_for_trigger_tests(self, session=None): @time_machine.travel(timezone.utcnow(), tick=False) @pytest.mark.parametrize( - "dag_run_id, logical_date, note, data_interval_start, data_interval_end", + "dag_run_id, note, data_interval_start, data_interval_end", [ - ("dag_run_5", "2020-06-11T18:00:00+00:00", "test-note", None, None), + ("dag_run_5", "test-note", None, None), ( "dag_run_6", - "2024-06-11T18:00:00+00:00", "test-note", "2024-01-03T00:00:00+00:00", "2024-01-04T05:00:00+00:00", ), - (None, "2020-06-11T18:00:00+00:00", None, None, None), - (None, None, None, None, None), + (None, None, None, None), ], ) def test_should_respond_200( self, test_client, dag_run_id, - logical_date, note, data_interval_start, data_interval_end, @@ -715,8 +712,6 @@ def test_should_respond_200( fixed_now = timezone.utcnow().isoformat() request_json = {"note": note} - if logical_date is not None: - request_json["logical_date"] = logical_date if dag_run_id is not None: request_json["dag_run_id"] = dag_run_id if data_interval_start is not None: @@ -728,26 +723,20 @@ def test_should_respond_200( f"/public/dags/{DAG1_ID}/dagRuns", json={ "dag_run_id": dag_run_id, - "logical_date": logical_date, "note": note, "data_interval_start": data_interval_start, "data_interval_end": data_interval_end, }, ) - assert response.status_code == 200 - if logical_date is None: - expected_logical_date = fixed_now - else: - expected_logical_date = logical_date if dag_run_id is None: - expected_dag_run_id = f"manual__{expected_logical_date}" + expected_dag_run_id = f"manual__{fixed_now}" else: expected_dag_run_id = dag_run_id - expected_data_interval_start = expected_logical_date.replace("+00:00", "Z") - expected_data_interval_end = expected_logical_date.replace("+00:00", "Z") + expected_data_interval_start = fixed_now.replace("+00:00", "Z") + expected_data_interval_end = fixed_now.replace("+00:00", "Z") if data_interval_start is not None and data_interval_end is not None: expected_data_interval_start = data_interval_start.replace("+00:00", "Z") expected_data_interval_end = data_interval_end.replace("+00:00", "Z") @@ -757,7 +746,7 @@ def test_should_respond_200( "dag_id": DAG1_ID, "run_id": expected_dag_run_id, "end_date": None, - "logical_date": expected_logical_date.replace("+00:00", "Z"), + "logical_date": fixed_now.replace("+00:00", "Z"), "external_trigger": True, "start_date": None, "state": "queued", @@ -795,8 +784,8 @@ def test_should_respond_200( { "input": "2020-11-10T08:25:56", "loc": ["body", "logical_date"], - "msg": "Input should have timezone info", - "type": "timezone_aware", + "msg": "Extra inputs are not permitted", + "type": "extra_forbidden", } ] }, @@ -900,33 +889,41 @@ def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, sessio @time_machine.travel(timezone.utcnow(), tick=False) def test_should_response_200_for_duplicate_logical_date(self, test_client): - RUN_ID = "random_1" - logical_date = LOGICAL_DATE1.isoformat().replace("+00:00", "Z") + RUN_ID_1 = "random_1" + RUN_ID_2 = "random_2" + now = timezone.utcnow().isoformat().replace("+00:00", "Z") note = "duplicate logical date test" - response = test_client.post( + response_1 = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={"logical_date": logical_date, "dag_run_id": RUN_ID, "note": note}, + json={"dag_run_id": RUN_ID_1, "note": note}, ) - now = timezone.utcnow().isoformat().replace("+00:00", "Z") - assert response.status_code == 200 - body = response.json() - assert body == { - "run_id": RUN_ID, - "dag_id": DAG1_ID, - "logical_date": logical_date, - "queued_at": now, - "start_date": None, - "end_date": None, - "data_interval_start": logical_date, - "data_interval_end": logical_date, - "last_scheduling_decision": None, - "run_type": "manual", - "state": "queued", - "external_trigger": True, - "triggered_by": "rest_api", - "conf": {}, - "note": note, - } + response_2 = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns", + json={"dag_run_id": RUN_ID_2, "note": note}, + ) + + assert response_1.status_code == response_2.status_code == 200 + body1 = response_1.json() + body2 = response_2.json() + + for each_run_id, each_body in [(RUN_ID_1, body1), (RUN_ID_2, body2)]: + assert each_body == { + "run_id": each_run_id, + "dag_id": DAG1_ID, + "logical_date": now, + "queued_at": now, + "start_date": None, + "end_date": None, + "data_interval_start": now, + "data_interval_end": now, + "last_scheduling_decision": None, + "run_type": "manual", + "state": "queued", + "external_trigger": True, + "triggered_by": "rest_api", + "conf": {}, + "note": note, + } @pytest.mark.parametrize( "data_interval_start, data_interval_end", From b82b1aec1fb256a369484a1ea6518724d42491a1 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 22 Nov 2024 22:44:29 +0530 Subject: [PATCH 09/15] fix --- airflow/api_fastapi/core_api/datamodels/dag_run.py | 2 +- airflow/api_fastapi/core_api/routes/public/dag_run.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 0bb75b40aa81f..11b18029152c9 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -21,7 +21,7 @@ from enum import Enum from fastapi import HTTPException, status -from pydantic import AwareDatetime, BaseModel, Field, computed_field, model_validator +from pydantic import AwareDatetime, Field, computed_field, model_validator from airflow.api_fastapi.core_api.base import BaseModel from airflow.models import DagRun diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 224d23b506c93..333734797e002 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -362,7 +362,7 @@ def trigger_dag_run( if dag_run_note: current_user_id = None # refer to https://github.com/apache/airflow/issues/43534 dag_run.note = (dag_run_note, current_user_id) - return DAGRunResponse.model_validate(dag_run, from_attributes=True) + return DAGRunResponse.model_validate(dag_run) except ValueError as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) From d2d45bd6ce420ad3f462db55e35189f8d69d50bd Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 22 Nov 2024 22:54:46 +0530 Subject: [PATCH 10/15] include return type --- airflow/api_fastapi/core_api/openapi/v1-generated.yaml | 3 ++- airflow/api_fastapi/core_api/routes/public/dag_run.py | 4 ++-- airflow/ui/openapi-gen/queries/queries.ts | 2 +- airflow/ui/openapi-gen/requests/services.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 4 ++-- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 5250203b4e8ad..626335020ae93 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1851,7 +1851,8 @@ paths: description: Successful Response content: application/json: - schema: {} + schema: + $ref: '#/components/schemas/DAGRunResponse' '401': content: application/json: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 333734797e002..756d72a12e11a 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -316,7 +316,7 @@ def get_dag_runs( ) def trigger_dag_run( dag_id, body: TriggerDAGRunPostBody, request: Request, session: Annotated[Session, Depends(get_session)] -): +) -> DAGRunResponse: """Trigger a DAG.""" dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1)) if not dm: @@ -362,7 +362,7 @@ def trigger_dag_run( if dag_run_note: current_user_id = None # refer to https://github.com/apache/airflow/issues/43534 dag_run.note = (dag_run_note, current_user_id) - return DAGRunResponse.model_validate(dag_run) + return dag_run except ValueError as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 5ed9331ae24c3..b147d4382d4fc 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2423,7 +2423,7 @@ export const useDagRunServiceClearDagRun = < * @param data The data for the request. * @param data.dagId * @param data.requestBody - * @returns unknown Successful Response + * @returns DAGRunResponse Successful Response * @throws ApiError */ export const useDagRunServiceTriggerDagRun = < diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 8499840a3f939..aaa2997e04580 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -1185,7 +1185,7 @@ export class DagRunService { * @param data The data for the request. * @param data.dagId * @param data.requestBody - * @returns unknown Successful Response + * @returns DAGRunResponse Successful Response * @throws ApiError */ public static triggerDagRun( diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 08456c80666e3..481c4ea4c8509 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1439,7 +1439,7 @@ export type TriggerDagRunData = { requestBody: TriggerDAGRunPostBody; }; -export type TriggerDagRunResponse = unknown; +export type TriggerDagRunResponse = DAGRunResponse; export type GetDagSourceData = { accept?: "application/json" | "text/plain" | "*/*"; @@ -2720,7 +2720,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: unknown; + 200: DAGRunResponse; /** * Bad Request */ From f730881d48ef4c6ceb3dc87e2752a6b69b4bb08b Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 22 Nov 2024 23:52:05 +0530 Subject: [PATCH 11/15] fix conf --- airflow/api_fastapi/core_api/datamodels/dag_run.py | 2 +- .../api_fastapi/core_api/openapi/v1-generated.yaml | 10 ++-------- airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 ++--------- airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- 4 files changed, 6 insertions(+), 19 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 11b18029152c9..7a7681e63a7b3 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -85,7 +85,7 @@ class TriggerDAGRunPostBody(BaseModel): data_interval_start: AwareDatetime | None = None data_interval_end: AwareDatetime | None = None - conf: dict | None = Field(default_factory=dict) + conf: dict = Field(default_factory=dict) note: str | None = None model_config = {"extra": "forbid"} diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 626335020ae93..1d710ec7bcd92 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -4123,8 +4123,6 @@ paths: in: path required: true schema: - enum: - - '~' const: '~' type: string title: Dag Id @@ -4132,8 +4130,6 @@ paths: in: path required: true schema: - enum: - - '~' const: '~' type: string title: Dag Run Id @@ -7928,7 +7924,7 @@ components: properties: __type: type: string - title: ' Type' + title: Type default: TimeDelta days: type: integer @@ -7966,9 +7962,7 @@ components: - type: 'null' title: Data Interval End conf: - anyOf: - - type: object - - type: 'null' + type: object title: Conf note: anyOf: diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 3c3f570d37533..4cd8bd1f12a7b 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4579,7 +4579,7 @@ export const $TimeDelta = { properties: { __type: { type: "string", - title: " Type", + title: "Type", default: "TimeDelta", }, days: { @@ -4640,14 +4640,7 @@ export const $TriggerDAGRunPostBody = { title: "Data Interval End", }, conf: { - anyOf: [ - { - type: "object", - }, - { - type: "null", - }, - ], + type: "object", title: "Conf", }, note: { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 481c4ea4c8509..0d281a6e48ab2 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1082,7 +1082,7 @@ export type TriggerDAGRunPostBody = { data_interval_end?: string | null; conf?: { [key: string]: unknown; - } | null; + }; note?: string | null; }; From c30a410af81f0ae36106ebff5c10b716fa7fb62f Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 26 Nov 2024 10:25:36 +0530 Subject: [PATCH 12/15] feedback --- .../core_api/datamodels/dag_run.py | 8 +- .../core_api/openapi/v1-generated.yaml | 1 - .../core_api/routes/public/dag_run.py | 74 ++++++++--------- .../ui/openapi-gen/requests/schemas.gen.ts | 1 - .../core_api/routes/public/test_dag_run.py | 82 ++++++++++--------- 5 files changed, 81 insertions(+), 85 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 7a7681e63a7b3..ca2f02a949766 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -20,7 +20,6 @@ from datetime import datetime from enum import Enum -from fastapi import HTTPException, status from pydantic import AwareDatetime, Field, computed_field, model_validator from airflow.api_fastapi.core_api.base import BaseModel @@ -88,14 +87,11 @@ class TriggerDAGRunPostBody(BaseModel): conf: dict = Field(default_factory=dict) note: str | None = None - model_config = {"extra": "forbid"} - @model_validator(mode="after") def check_data_intervals(cls, values): if (values.data_interval_start is None) != (values.data_interval_end is None): - raise HTTPException( - status.HTTP_422_UNPROCESSABLE_ENTITY, - "Either both data_interval_start and data_interval_end must be provided or both must be None", + raise ValueError( + "Either both data_interval_start and data_interval_end must be provided or both must be None" ) return values diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 1d710ec7bcd92..9d01629d0ff97 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -7969,7 +7969,6 @@ components: - type: string - type: 'null' title: Note - additionalProperties: false type: object title: TriggerDAGRunPostBody description: Trigger DAG Run Serializer for POST body. diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 756d72a12e11a..72514c5e8cf7a 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -22,6 +22,7 @@ import pendulum from fastapi import Depends, HTTPException, Query, Request, status from sqlalchemy import select +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from airflow.api.common.mark_tasks import ( @@ -53,6 +54,7 @@ TaskInstanceResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import ParamValidationError from airflow.models import DAG, DagModel, DagRun from airflow.models.dag_version import DagVersion from airflow.timetables.base import DataInterval @@ -330,43 +332,41 @@ def trigger_dag_run( run_id = body.dag_run_id logical_date = pendulum.instance(body.logical_date) - dagrun_instance = session.scalar( - select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == run_id).limit(1) - ) - if not dagrun_instance: - try: - dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + try: + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) - if body.data_interval_start and body.data_interval_end: - data_interval = DataInterval( - start=pendulum.instance(body.data_interval_start), - end=pendulum.instance(body.data_interval_end), - ) - else: - data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) - dag_version = DagVersion.get_latest_version(dag.dag_id) - dag_run = dag.create_dagrun( - run_type=DagRunType.MANUAL, - run_id=run_id, - logical_date=logical_date, - data_interval=data_interval, - state=DagRunState.QUEUED, - conf=body.conf, - external_trigger=True, - dag_version=dag_version, - session=session, - triggered_by=DagRunTriggeredByType.REST_API, + if body.data_interval_start and body.data_interval_end: + data_interval = DataInterval( + start=pendulum.instance(body.data_interval_start), + end=pendulum.instance(body.data_interval_end), ) - dag_run_note = body.note - if dag_run_note: - current_user_id = None # refer to https://github.com/apache/airflow/issues/43534 - dag_run.note = (dag_run_note, current_user_id) - return dag_run - except ValueError as e: - raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) - - raise HTTPException( - status.HTTP_409_CONFLICT, - f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{body.dag_run_id}' already exists", - ) + else: + data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) + dag_version = DagVersion.get_latest_version(dag.dag_id) + dag_run = dag.create_dagrun( + run_type=DagRunType.MANUAL, + run_id=run_id, + logical_date=logical_date, + data_interval=data_interval, + state=DagRunState.QUEUED, + conf=body.conf, + external_trigger=True, + dag_version=dag_version, + session=session, + triggered_by=DagRunTriggeredByType.REST_API, + ) + dag_run_note = body.note + if dag_run_note: + current_user_id = None # refer to https://github.com/apache/airflow/issues/43534 + dag_run.note = (dag_run_note, current_user_id) + return dag_run + except ValueError as e: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) + except IntegrityError: + raise HTTPException( + status.HTTP_409_CONFLICT, + f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists", + ) + except ParamValidationError as e: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 4cd8bd1f12a7b..700dc91fc793f 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4655,7 +4655,6 @@ export const $TriggerDAGRunPostBody = { title: "Note", }, }, - additionalProperties: false, type: "object", title: "TriggerDAGRunPostBody", description: "Trigger DAG Run Serializer for POST body.", diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index cc9dffb95d192..329c41c799fbc 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -26,6 +26,7 @@ from airflow.models import DagModel, DagRun from airflow.models.asset import AssetEvent, AssetModel +from airflow.models.param import Param from airflow.operators.empty import EmptyOperator from airflow.sdk.definitions.asset import Asset from airflow.utils import timezone @@ -62,6 +63,7 @@ LOGICAL_DATE3 = datetime(2024, 5, 16, 0, 0, tzinfo=timezone.utc) LOGICAL_DATE4 = datetime(2024, 5, 25, 0, 0, tzinfo=timezone.utc) DAG1_RUN1_NOTE = "test_note" +DAG2_PARAM = {"validated_number": Param(1, minimum=1, maximum=10)} @pytest.fixture(autouse=True) @@ -102,11 +104,7 @@ def setup(request, dag_maker, session=None): logical_date=LOGICAL_DATE2, ) - with dag_maker( - DAG2_ID, - schedule=None, - start_date=START_DATE2, - ): + with dag_maker(DAG2_ID, schedule=None, start_date=START_DATE2, params=DAG2_PARAM): EmptyOperator(task_id="task_2") dag_maker.create_dagrun( run_id=DAG2_RUN1_ID, @@ -660,7 +658,6 @@ def test_clear_dag_run_unprocessable_entity(self, test_client): assert body["detail"][0]["loc"][0] == "body" -# @pytest.mark.no_setup class TestTriggerDagRun: def _dags_for_trigger_tests(self, session=None): inactive_dag = DagModel( @@ -721,12 +718,7 @@ def test_should_respond_200( response = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={ - "dag_run_id": dag_run_id, - "note": note, - "data_interval_start": data_interval_start, - "data_interval_end": data_interval_end, - }, + json=request_json, ) assert response.status_code == 200 @@ -764,32 +756,33 @@ def test_should_respond_200( @pytest.mark.parametrize( "post_body, expected_detail", [ - ( - {"executiondate": "2020-11-10T08:25:56Z"}, - { - "detail": [ - { - "input": "2020-11-10T08:25:56Z", - "loc": ["body", "executiondate"], - "msg": "Extra inputs are not permitted", - "type": "extra_forbidden", - } - ] - }, - ), - ( - {"logical_date": "2020-11-10T08:25:56"}, - { - "detail": [ - { - "input": "2020-11-10T08:25:56", - "loc": ["body", "logical_date"], - "msg": "Extra inputs are not permitted", - "type": "extra_forbidden", - } - ] - }, - ), + # Uncomment these 2 test cases once https://github.com/apache/airflow/pull/44306 is merged + # ( + # {"executiondate": "2020-11-10T08:25:56Z"}, + # { + # "detail": [ + # { + # "input": "2020-11-10T08:25:56Z", + # "loc": ["body", "executiondate"], + # "msg": "Extra inputs are not permitted", + # "type": "extra_forbidden", + # } + # ] + # }, + # ), + # ( + # {"logical_date": "2020-11-10T08:25:56"}, + # { + # "detail": [ + # { + # "input": "2020-11-10T08:25:56", + # "loc": ["body", "logical_date"], + # "msg": "Extra inputs are not permitted", + # "type": "extra_forbidden", + # } + # ] + # }, + # ), ( {"data_interval_start": "2020-11-10T08:25:56"}, { @@ -947,9 +940,18 @@ def test_should_response_422_for_missing_start_date_or_end_date( ) assert response.status_code == 422 assert ( - response.json()["detail"] - == "Either both data_interval_start and data_interval_end must be provided or both must be None" + response.json()["detail"][0]["msg"] + == "Value error, Either both data_interval_start and data_interval_end must be provided or both must be None" + ) + + def test_raises_validation_error_for_invalid_params(self, test_client): + response = test_client.post( + f"/public/dags/{DAG2_ID}/dagRuns", + json={"conf": {"validated_number": 5000}}, ) + # breakpoint() + assert response.status_code == 400 + assert "Invalid input for param validated_number" in response.json()["detail"] def test_response_404(self, test_client): response = test_client.post("/public/dags/randoms/dagRuns", json={}) From a1c50dc20bbe557006e8d0b28cb2007afa98c8d3 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 26 Nov 2024 11:42:02 +0530 Subject: [PATCH 13/15] fix tests --- tests/api_fastapi/core_api/routes/public/test_dag_run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 2d7882308c531..071de56b744d9 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -736,7 +736,7 @@ def test_should_respond_200( expected_response_json = { "conf": {}, "dag_id": DAG1_ID, - "run_id": expected_dag_run_id, + "dag_run_id": expected_dag_run_id, "end_date": None, "logical_date": fixed_now.replace("+00:00", "Z"), "external_trigger": True, @@ -901,7 +901,7 @@ def test_should_response_200_for_duplicate_logical_date(self, test_client): for each_run_id, each_body in [(RUN_ID_1, body1), (RUN_ID_2, body2)]: assert each_body == { - "run_id": each_run_id, + "dag_run_id": each_run_id, "dag_id": DAG1_ID, "logical_date": now, "queued_at": now, From b1911c013f8276378b0b19d9420819b8ce4513d7 Mon Sep 17 00:00:00 2001 From: Kalyan R Date: Tue, 26 Nov 2024 12:38:56 +0530 Subject: [PATCH 14/15] Update tests/api_fastapi/core_api/routes/public/test_dag_run.py --- tests/api_fastapi/core_api/routes/public/test_dag_run.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 071de56b744d9..3c677529a0313 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -949,7 +949,6 @@ def test_raises_validation_error_for_invalid_params(self, test_client): f"/public/dags/{DAG2_ID}/dagRuns", json={"conf": {"validated_number": 5000}}, ) - # breakpoint() assert response.status_code == 400 assert "Invalid input for param validated_number" in response.json()["detail"] From c1f6668fa4d802ac049e15839eaa9eb012d3bad8 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 26 Nov 2024 16:25:40 +0530 Subject: [PATCH 15/15] feedback --- airflow/api_fastapi/core_api/routes/public/dag_run.py | 6 ------ tests/api_fastapi/core_api/routes/public/test_dag_run.py | 5 +---- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 209ff333e8b56..1e95f75273c16 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -22,7 +22,6 @@ import pendulum from fastapi import Depends, HTTPException, Query, Request, status from sqlalchemy import select -from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from airflow.api.common.mark_tasks import ( @@ -368,11 +367,6 @@ def trigger_dag_run( return dag_run except ValueError as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) - except IntegrityError: - raise HTTPException( - status.HTTP_409_CONFLICT, - f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists", - ) except ParamValidationError as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 933b3765a36b4..cf3a3b1a12a52 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1348,7 +1348,4 @@ def test_response_404(self, test_client): def test_response_409(self, test_client): response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID}) assert response.status_code == 409 - assert ( - response.json()["detail"] - == "DAGRun with DAG ID: 'test_dag1' and DAGRun ID: 'dag_run_1' already exists" - ) + assert response.json()["detail"] == "Unique constraint violation"