From e1e314997a0005544857c93665b1524340282d2d Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 19 Jun 2025 15:56:19 +0800 Subject: [PATCH 1/5] Add endpoint to watch dag run until finish --- .../core_api/datamodels/dag_run.py | 7 +++ .../openapi/v2-rest-api-generated.yaml | 60 +++++++++++++++++++ .../core_api/routes/public/dag_run.py | 40 ++++++++++++- airflow-core/src/airflow/settings.py | 47 ++++++++------- .../airflow/ui/openapi-gen/queries/common.ts | 8 +++ .../ui/openapi-gen/queries/ensureQueryData.ts | 15 +++++ .../ui/openapi-gen/queries/prefetch.ts | 15 +++++ .../airflow/ui/openapi-gen/queries/queries.ts | 15 +++++ .../ui/openapi-gen/queries/suspense.ts | 15 +++++ .../ui/openapi-gen/requests/services.gen.ts | 32 +++++++++- .../ui/openapi-gen/requests/types.gen.ts | 35 +++++++++++ .../core_api/routes/public/test_dag_run.py | 42 +++++++++++++ 12 files changed, 306 insertions(+), 25 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index e184a3acfe864..03e87c478ff5e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -164,3 +164,10 @@ class DAGRunsBatchBody(StrictBaseModel): start_date_lte: AwareDatetime | None = None end_date_gte: AwareDatetime | None = None end_date_lte: AwareDatetime | None = None + + +class DAGRunWatchResult(StrictBaseModel): + """Status update from the dag run watch endpoint.""" + + duration: float | None + state: DagRunState diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index c0b2e81c71815..b1cf8ed71b020 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2147,6 +2147,66 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/watch: + get: + tags: + - DagRun + summary: Watch Dag Run Until Finished + description: Watch a dag run until it reaches a finished state (e.g. success + or failed). + operationId: watch_dag_run_until_finished + security: + - OAuth2PasswordBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: interval + in: query + required: true + schema: + type: number + exclusiveMinimum: 0.0 + title: Interval + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/dags/{dag_id}/dagRuns/list: post: tags: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 8f9b11b3de78a..9432fbbeb864f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -17,11 +17,14 @@ from __future__ import annotations +import asyncio +from collections.abc import AsyncGenerator from typing import Annotated, Literal, cast import structlog from fastapi import Depends, HTTPException, Query, status from fastapi.exceptions import RequestValidationError +from fastapi.responses import StreamingResponse from pydantic import ValidationError from sqlalchemy import select from sqlalchemy.orm import joinedload @@ -59,6 +62,7 @@ DAGRunPatchStates, DAGRunResponse, DAGRunsBatchBody, + DAGRunWatchResult, TriggerDAGRunPostBody, ) from airflow.api_fastapi.core_api.datamodels.task_instances import ( @@ -76,7 +80,8 @@ from airflow.exceptions import ParamValidationError from airflow.listeners.listener import get_listener_manager from airflow.models import DAG, DagModel, DagRun -from airflow.utils.state import DagRunState +from airflow.utils.session import create_session_async +from airflow.utils.state import DagRunState, State from airflow.utils.types import DagRunTriggeredByType, DagRunType log = structlog.get_logger(__name__) @@ -438,6 +443,39 @@ def trigger_dag_run( raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) +async def _watch_dagrun(dag_id: str, run_id: str, interval: float) -> AsyncGenerator[str, None]: + async with create_session_async() as session: + dag_run = await session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=run_id)) + yield DAGRunWatchResult.model_validate(dag_run, from_attributes=True).model_dump_json() + yield "\n" + while dag_run.state not in State.finished_dr_states: + await asyncio.sleep(interval) + async with create_session_async() as session: + dag_run = await session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=run_id)) + yield DAGRunWatchResult.model_validate(dag_run, from_attributes=True).model_dump_json() + yield "\n" + + +@dag_run_router.get( + "/{dag_run_id}/watch", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.RUN))], +) +def watch_dag_run_until_finished( + dag_id: str, + dag_run_id: str, + interval: Annotated[float, Query(gt=0.0)], + session: SessionDep, +): + "Watch a dag run until it reaches a finished state (e.g. success or failed)." + if not session.scalar(select(1).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id)): + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", + ) + return StreamingResponse(_watch_dagrun(dag_id, dag_run_id, interval)) + + @dag_run_router.post( "/list", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), diff --git a/airflow-core/src/airflow/settings.py b/airflow-core/src/airflow/settings.py index 689ce2e4e6819..ca8de146775e2 100644 --- a/airflow-core/src/airflow/settings.py +++ b/airflow-core/src/airflow/settings.py @@ -22,7 +22,6 @@ import json import logging import os -import platform import sys import warnings from collections.abc import Callable @@ -321,6 +320,20 @@ def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool: return True +def _configure_async_session(): + global async_engine + global AsyncSession + + async_engine = create_async_engine(SQL_ALCHEMY_CONN_ASYNC, future=True) + AsyncSession = sessionmaker( + bind=async_engine, + autocommit=False, + autoflush=False, + class_=SAAsyncSession, + expire_on_commit=False, + ) + + def configure_orm(disable_connection_pool=False, pool_class=None): """Configure ORM using SQLAlchemy.""" from airflow.sdk.execution_time.secrets_masker import mask_secret @@ -335,8 +348,6 @@ def configure_orm(disable_connection_pool=False, pool_class=None): global Session global engine - global async_engine - global AsyncSession global NonScopedSession if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": @@ -359,34 +370,24 @@ def configure_orm(disable_connection_pool=False, pool_class=None): connect_args["check_same_thread"] = False engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True) - async_engine = create_async_engine(SQL_ALCHEMY_CONN_ASYNC, future=True) - AsyncSession = sessionmaker( - bind=async_engine, - autocommit=False, - autoflush=False, - class_=SAAsyncSession, - expire_on_commit=False, - ) mask_secret(engine.url.password) - setup_event_handlers(engine) if conf.has_option("database", "sql_alchemy_session_maker"): _session_maker = conf.getimport("database", "sql_alchemy_session_maker") else: - - def _session_maker(_engine): - return sessionmaker( - autocommit=False, - autoflush=False, - bind=_engine, - expire_on_commit=False, - ) - + _session_maker = functools.partial( + sessionmaker, + autocommit=False, + autoflush=False, + expire_on_commit=False, + ) NonScopedSession = _session_maker(engine) Session = scoped_session(NonScopedSession) - if not platform.system() == "Windows": + _configure_async_session() + + if register_at_fork := getattr(os, "register_at_fork", None): # https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork def clean_in_fork(): _globals = globals() @@ -396,7 +397,7 @@ def clean_in_fork(): async_engine.sync_engine.dispose(close=False) # Won't work on Windows - os.register_at_fork(after_in_child=clean_in_fork) + register_at_fork(after_in_child=clean_in_fork) DEFAULT_ENGINE_ARGS = { diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 4c76141a70d93..6f02e1e52bc4a 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -159,6 +159,14 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, endDateGte, endDateLte, updatedAtGte?: string; updatedAtLte?: string; }, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }])]; +export type DagRunServiceWatchDagRunUntilFinishedDefaultResponse = Awaited>; +export type DagRunServiceWatchDagRunUntilFinishedQueryResult = UseQueryResult; +export const useDagRunServiceWatchDagRunUntilFinishedKey = "DagRunServiceWatchDagRunUntilFinished"; +export const UseDagRunServiceWatchDagRunUntilFinishedKeyFn = ({ dagId, dagRunId, interval }: { + dagId: string; + dagRunId: string; + interval: number; +}, queryKey?: Array) => [useDagRunServiceWatchDagRunUntilFinishedKey, ...(queryKey ?? [{ dagId, dagRunId, interval }])]; export type DagSourceServiceGetDagSourceDefaultResponse = Awaited>; export type DagSourceServiceGetDagSourceQueryResult = UseQueryResult; export const useDagSourceServiceGetDagSourceKey = "DagSourceServiceGetDagSource"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 99e3a1f56aaac..3514c1a92a8ac 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -296,6 +296,21 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { updatedAtLte?: string; }) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) }); /** +* Watch Dag Run Until Finished +* Watch a dag run until it reaches a finished state (e.g. success or failed). +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval +* @returns unknown Successful Response +* @throws ApiError +*/ +export const ensureUseDagRunServiceWatchDagRunUntilFinishedData = (queryClient: QueryClient, { dagId, dagRunId, interval }: { + dagId: string; + dagRunId: string; + interval: number; +}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceWatchDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval }), queryFn: () => DagRunService.watchDagRunUntilFinished({ dagId, dagRunId, interval }) }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index aff2296830eda..aded3dc6f41ec 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -296,6 +296,21 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { d updatedAtLte?: string; }) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) }); /** +* Watch Dag Run Until Finished +* Watch a dag run until it reaches a finished state (e.g. success or failed). +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval +* @returns unknown Successful Response +* @throws ApiError +*/ +export const prefetchUseDagRunServiceWatchDagRunUntilFinished = (queryClient: QueryClient, { dagId, dagRunId, interval }: { + dagId: string; + dagRunId: string; + interval: number; +}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceWatchDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval }), queryFn: () => DagRunService.watchDagRunUntilFinished({ dagId, dagRunId, interval }) }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 86294553c8dd2..24f9cc099f5ed 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -296,6 +296,21 @@ export const useDagRunServiceGetDagRuns = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) as TData, ...options }); /** +* Watch Dag Run Until Finished +* Watch a dag run until it reaches a finished state (e.g. success or failed). +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useDagRunServiceWatchDagRunUntilFinished = = unknown[]>({ dagId, dagRunId, interval }: { + dagId: string; + dagRunId: string; + interval: number; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceWatchDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval }, queryKey), queryFn: () => DagRunService.watchDagRunUntilFinished({ dagId, dagRunId, interval }) as TData, ...options }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 6ce893418107b..1d3b939230905 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -296,6 +296,21 @@ export const useDagRunServiceGetDagRunsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) as TData, ...options }); /** +* Watch Dag Run Until Finished +* Watch a dag run until it reaches a finished state (e.g. success or failed). +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useDagRunServiceWatchDagRunUntilFinishedSuspense = = unknown[]>({ dagId, dagRunId, interval }: { + dagId: string; + dagRunId: string; + interval: number; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceWatchDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval }, queryKey), queryFn: () => DagRunService.watchDagRunUntilFinished({ dagId, dagRunId, interval }) as TData, ...options }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index d3dc37b8e662d..e0b57af9852ce 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetDagReportsData, GetDagReportsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutData, LogoutResponse, GetAuthMenusResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GridDataData, GridDataResponse, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetLatestRunData, GetLatestRunResponse, GetCalendarData, GetCalendarResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WatchDagRunUntilFinishedData, WatchDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetDagReportsData, GetDagReportsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutData, LogoutResponse, GetAuthMenusResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GridDataData, GridDataResponse, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetLatestRunData, GetLatestRunResponse, GetCalendarData, GetCalendarResponse } from './types.gen'; export class AssetService { /** @@ -1050,6 +1050,36 @@ export class DagRunService { }); } + /** + * Watch Dag Run Until Finished + * Watch a dag run until it reaches a finished state (e.g. success or failed). + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.interval + * @returns unknown Successful Response + * @throws ApiError + */ + public static watchDagRunUntilFinished(data: WatchDagRunUntilFinishedData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/watch', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId + }, + query: { + interval: data.interval + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + /** * Get List Dag Runs Batch * Get a list of DAG Runs. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index b1590d732fe23..814e4f6fd1196 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2216,6 +2216,14 @@ export type TriggerDagRunData = { export type TriggerDagRunResponse = DAGRunResponse; +export type WatchDagRunUntilFinishedData = { + dagId: string; + dagRunId: string; + interval: number; +}; + +export type WatchDagRunUntilFinishedResponse = unknown; + export type GetListDagRunsBatchData = { dagId: "~"; requestBody: DAGRunsBatchBody; @@ -3979,6 +3987,33 @@ export type $OpenApiTs = { }; }; }; + '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/watch': { + get: { + req: WatchDagRunUntilFinishedData; + res: { + /** + * Successful Response + */ + 200: unknown; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; '/api/v2/dags/{dag_id}/dagRuns/list': { post: { req: GetListDagRunsBatchData; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index acb054cb3f9a6..74a370706173b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1621,3 +1621,45 @@ def test_should_respond_200_with_null_logical_date(self, test_client): "conf": {}, "note": None, } + + +class TestWatchDagRun: + # The way we init async engine does not work well with FastAPI app init. + # Creating the engine implicitly creates an event loop, which Airflow does + # once for the entire process; creating the FastAPI app also does, but our + # test setup does it once for each test. I don't know how to properly fix + # this without rewriting how Airflow does db; re-configuring the db for each + # test at least makes the tests run correctly. + @pytest.fixture(autouse=True) + def reconfigure_async_db_engine(self): + from airflow.settings import _configure_async_session + + _configure_async_session() + + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/watch?interval=1") + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/watch?interval=1") + assert response.status_code == 403 + + def test_should_respond_404(self, test_client): + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/does-not-exist/watch?interval=1") + assert response.status_code == 404 + + def test_should_respond_422_without_interval_param(self, test_client): + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/watch") + assert response.status_code == 422 + + @pytest.mark.parametrize( + "run_id, state", + [(DAG1_RUN1_ID, DAG1_RUN1_STATE), (DAG1_RUN2_ID, DAG1_RUN2_STATE)], + ) + def test_should_respond_200_immediately_for_finished_run(self, test_client, run_id, state): + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{run_id}/watch?interval=100") + assert response.status_code == 200 + data = response.json() + assert isinstance(data, dict) + assert sorted(data) == ["duration", "state"] + assert data["state"] == state From edf8787da55469dde0f51a5fe56eb3ef472ce86a Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 26 Jun 2025 17:15:19 +0800 Subject: [PATCH 2/5] Include xcom in result if requested --- .../core_api/datamodels/dag_run.py | 7 -- .../openapi/v2-rest-api-generated.yaml | 31 +++++-- .../core_api/routes/public/dag_run.py | 62 ++++++++------ .../core_api/services/public/dag_run.py | 85 +++++++++++++++++++ .../airflow/ui/openapi-gen/queries/common.ts | 11 +-- .../ui/openapi-gen/queries/ensureQueryData.ts | 12 +-- .../ui/openapi-gen/queries/prefetch.ts | 12 +-- .../airflow/ui/openapi-gen/queries/queries.ts | 12 +-- .../ui/openapi-gen/queries/suspense.ts | 12 +-- .../ui/openapi-gen/requests/services.gen.ts | 16 ++-- .../ui/openapi-gen/requests/types.gen.ts | 15 +++- .../core_api/routes/public/test_dag_run.py | 26 +++--- 12 files changed, 218 insertions(+), 83 deletions(-) create mode 100644 airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 03e87c478ff5e..e184a3acfe864 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -164,10 +164,3 @@ class DAGRunsBatchBody(StrictBaseModel): start_date_lte: AwareDatetime | None = None end_date_gte: AwareDatetime | None = None end_date_lte: AwareDatetime | None = None - - -class DAGRunWatchResult(StrictBaseModel): - """Status update from the dag run watch endpoint.""" - - duration: float | None - state: DagRunState diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index b1cf8ed71b020..0e5d0a75d4d0c 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2147,14 +2147,13 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/watch: + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait: get: tags: - DagRun - summary: Watch Dag Run Until Finished - description: Watch a dag run until it reaches a finished state (e.g. success - or failed). - operationId: watch_dag_run_until_finished + summary: Wait Dag Run Until Finished + description: Wait for a dag run until it finishes, and return its return value. + operationId: wait_dag_run_until_finished security: - OAuth2PasswordBearer: [] parameters: @@ -2176,13 +2175,35 @@ paths: schema: type: number exclusiveMinimum: 0.0 + description: Seconds to wait between dag run state checks title: Interval + description: Seconds to wait between dag run state checks + - name: collect + in: query + required: false + schema: + anyOf: + - type: array + items: + type: string + - type: 'null' + description: Collect return value XCom from task. Can be set multiple times. + title: Collect + description: Collect return value XCom from task. Can be set multiple times. responses: '200': description: Successful Response content: application/json: schema: {} + application/x-ndjson: + schema: + type: string + example: '{"state": "running"} + + {"state": "success", "returns": {"op": 42}} + + ' '401': content: application/json: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 9432fbbeb864f..c2fca613dc69c 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -17,8 +17,7 @@ from __future__ import annotations -import asyncio -from collections.abc import AsyncGenerator +import textwrap from typing import Annotated, Literal, cast import structlog @@ -54,6 +53,7 @@ search_param_factory, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype from airflow.api_fastapi.core_api.datamodels.assets import AssetEventCollectionResponse from airflow.api_fastapi.core_api.datamodels.dag_run import ( DAGRunClearBody, @@ -62,7 +62,6 @@ DAGRunPatchStates, DAGRunResponse, DAGRunsBatchBody, - DAGRunWatchResult, TriggerDAGRunPostBody, ) from airflow.api_fastapi.core_api.datamodels.task_instances import ( @@ -76,12 +75,12 @@ requires_access_asset, requires_access_dag, ) +from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter from airflow.api_fastapi.logging.decorators import action_logging from airflow.exceptions import ParamValidationError from airflow.listeners.listener import get_listener_manager from airflow.models import DAG, DagModel, DagRun -from airflow.utils.session import create_session_async -from airflow.utils.state import DagRunState, State +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType log = structlog.get_logger(__name__) @@ -443,37 +442,52 @@ def trigger_dag_run( raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) -async def _watch_dagrun(dag_id: str, run_id: str, interval: float) -> AsyncGenerator[str, None]: - async with create_session_async() as session: - dag_run = await session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=run_id)) - yield DAGRunWatchResult.model_validate(dag_run, from_attributes=True).model_dump_json() - yield "\n" - while dag_run.state not in State.finished_dr_states: - await asyncio.sleep(interval) - async with create_session_async() as session: - dag_run = await session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=run_id)) - yield DAGRunWatchResult.model_validate(dag_run, from_attributes=True).model_dump_json() - yield "\n" - - @dag_run_router.get( - "/{dag_run_id}/watch", - responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + "/{dag_run_id}/wait", + responses={ + **create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + status.HTTP_200_OK: { + "description": "Successful Response", + "content": { + Mimetype.NDJSON: { + "schema": { + "type": "string", + "example": textwrap.dedent( + """\ + {"state": "running"} + {"state": "success", "returns": {"op": 42}} + """ + ), + } + } + }, + }, + }, dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.RUN))], ) -def watch_dag_run_until_finished( +def wait_dag_run_until_finished( dag_id: str, dag_run_id: str, - interval: Annotated[float, Query(gt=0.0)], session: SessionDep, + interval: Annotated[float, Query(gt=0.0, description="Seconds to wait between dag run state checks")], + collect_task_ids: Annotated[ + list[str] | None, + Query(alias="collect", description="Collect return value XCom from task. Can be set multiple times."), + ] = None, ): - "Watch a dag run until it reaches a finished state (e.g. success or failed)." + "Wait for a dag run until it finishes, and return its return value." if not session.scalar(select(1).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id)): raise HTTPException( status.HTTP_404_NOT_FOUND, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", ) - return StreamingResponse(_watch_dagrun(dag_id, dag_run_id, interval)) + waiter = DagRunWaiter( + dag_id=dag_id, + run_id=dag_run_id, + interval=interval, + collect_task_ids=collect_task_ids, + ) + return StreamingResponse(waiter.wait()) @dag_run_router.post( diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py new file mode 100644 index 0000000000000..dc1c8e4515c1a --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import asyncio +import itertools +import json +import operator +from typing import TYPE_CHECKING, Any + +import attrs +from sqlalchemy import select + +from airflow.models.dagrun import DagRun +from airflow.models.xcom import XCOM_RETURN_KEY, XComModel +from airflow.utils.session import create_session_async +from airflow.utils.state import State + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Iterator + + +@attrs.define +class DagRunWaiter: + """Wait for the specified dag run to finish, and collect info from it.""" + + dag_id: str + run_id: str + interval: float + collect_task_ids: list[str] | None + + async def _get_dag_run(self) -> DagRun: + async with create_session_async() as session: + return await session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id)) + + def _serialize_xcoms(self) -> dict[str, Any]: + xcom_query = XComModel.get_many( + run_id=self.run_id, + key=XCOM_RETURN_KEY, + task_ids=self.collect_task_ids, + dag_ids=self.dag_id, + ) + xcom_query = xcom_query.order_by(XComModel.task_id, XComModel.map_index) + + def _group_xcoms(g: Iterator[XComModel]) -> Any: + entries = list(g) + if len(entries) == 1 and entries[0].map_index < 0: # Unpack non-mapped task xcom. + return entries[0].value + return [entry.value for entry in entries] # Task is mapped; return all xcoms in a list. + + return { + task_id: _group_xcoms(g) + for task_id, g in itertools.groupby(xcom_query, key=operator.attrgetter("task_id")) + } + + def _serialize_response(self, dag_run: DagRun) -> str: + resp = {"state": dag_run.state} + if dag_run.state not in State.finished_dr_states: + return json.dumps(resp) + if self.collect_task_ids: + resp["returns"] = self._serialize_xcoms() + return json.dumps(resp) + + async def wait(self) -> AsyncGenerator[str, None]: + yield self._serialize_response(dag_run := await self._get_dag_run()) + yield "\n" + while dag_run.state not in State.finished_dr_states: + await asyncio.sleep(self.interval) + yield self._serialize_response(dag_run := await self._get_dag_run()) + yield "\n" diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 6f02e1e52bc4a..42475cd404019 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -159,14 +159,15 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, endDateGte, endDateLte, updatedAtGte?: string; updatedAtLte?: string; }, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }])]; -export type DagRunServiceWatchDagRunUntilFinishedDefaultResponse = Awaited>; -export type DagRunServiceWatchDagRunUntilFinishedQueryResult = UseQueryResult; -export const useDagRunServiceWatchDagRunUntilFinishedKey = "DagRunServiceWatchDagRunUntilFinished"; -export const UseDagRunServiceWatchDagRunUntilFinishedKeyFn = ({ dagId, dagRunId, interval }: { +export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = Awaited>; +export type DagRunServiceWaitDagRunUntilFinishedQueryResult = UseQueryResult; +export const useDagRunServiceWaitDagRunUntilFinishedKey = "DagRunServiceWaitDagRunUntilFinished"; +export const UseDagRunServiceWaitDagRunUntilFinishedKeyFn = ({ collect, dagId, dagRunId, interval }: { + collect?: string[]; dagId: string; dagRunId: string; interval: number; -}, queryKey?: Array) => [useDagRunServiceWatchDagRunUntilFinishedKey, ...(queryKey ?? [{ dagId, dagRunId, interval }])]; +}, queryKey?: Array) => [useDagRunServiceWaitDagRunUntilFinishedKey, ...(queryKey ?? [{ collect, dagId, dagRunId, interval }])]; export type DagSourceServiceGetDagSourceDefaultResponse = Awaited>; export type DagSourceServiceGetDagSourceQueryResult = UseQueryResult; export const useDagSourceServiceGetDagSourceKey = "DagSourceServiceGetDagSource"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 3514c1a92a8ac..3815f021abefc 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -296,20 +296,22 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { updatedAtLte?: string; }) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) }); /** -* Watch Dag Run Until Finished -* Watch a dag run until it reaches a finished state (e.g. success or failed). +* Wait Dag Run Until Finished +* Wait for a dag run until it finishes, and return its return value. * @param data The data for the request. * @param data.dagId * @param data.dagRunId -* @param data.interval +* @param data.interval Seconds to wait between dag run state checks +* @param data.collect Collect return value XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ -export const ensureUseDagRunServiceWatchDagRunUntilFinishedData = (queryClient: QueryClient, { dagId, dagRunId, interval }: { +export const ensureUseDagRunServiceWaitDagRunUntilFinishedData = (queryClient: QueryClient, { collect, dagId, dagRunId, interval }: { + collect?: string[]; dagId: string; dagRunId: string; interval: number; -}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceWatchDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval }), queryFn: () => DagRunService.watchDagRunUntilFinished({ dagId, dagRunId, interval }) }); +}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index aded3dc6f41ec..9b4d61a584e54 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -296,20 +296,22 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { d updatedAtLte?: string; }) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) }); /** -* Watch Dag Run Until Finished -* Watch a dag run until it reaches a finished state (e.g. success or failed). +* Wait Dag Run Until Finished +* Wait for a dag run until it finishes, and return its return value. * @param data The data for the request. * @param data.dagId * @param data.dagRunId -* @param data.interval +* @param data.interval Seconds to wait between dag run state checks +* @param data.collect Collect return value XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ -export const prefetchUseDagRunServiceWatchDagRunUntilFinished = (queryClient: QueryClient, { dagId, dagRunId, interval }: { +export const prefetchUseDagRunServiceWaitDagRunUntilFinished = (queryClient: QueryClient, { collect, dagId, dagRunId, interval }: { + collect?: string[]; dagId: string; dagRunId: string; interval: number; -}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceWatchDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval }), queryFn: () => DagRunService.watchDagRunUntilFinished({ dagId, dagRunId, interval }) }); +}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 24f9cc099f5ed..ce8f8be77e565 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -296,20 +296,22 @@ export const useDagRunServiceGetDagRuns = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) as TData, ...options }); /** -* Watch Dag Run Until Finished -* Watch a dag run until it reaches a finished state (e.g. success or failed). +* Wait Dag Run Until Finished +* Wait for a dag run until it finishes, and return its return value. * @param data The data for the request. * @param data.dagId * @param data.dagRunId -* @param data.interval +* @param data.interval Seconds to wait between dag run state checks +* @param data.collect Collect return value XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ -export const useDagRunServiceWatchDagRunUntilFinished = = unknown[]>({ dagId, dagRunId, interval }: { +export const useDagRunServiceWaitDagRunUntilFinished = = unknown[]>({ collect, dagId, dagRunId, interval }: { + collect?: string[]; dagId: string; dagRunId: string; interval: number; -}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceWatchDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval }, queryKey), queryFn: () => DagRunService.watchDagRunUntilFinished({ dagId, dagRunId, interval }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) as TData, ...options }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 1d3b939230905..a93226205ac79 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -296,20 +296,22 @@ export const useDagRunServiceGetDagRunsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) as TData, ...options }); /** -* Watch Dag Run Until Finished -* Watch a dag run until it reaches a finished state (e.g. success or failed). +* Wait Dag Run Until Finished +* Wait for a dag run until it finishes, and return its return value. * @param data The data for the request. * @param data.dagId * @param data.dagRunId -* @param data.interval +* @param data.interval Seconds to wait between dag run state checks +* @param data.collect Collect return value XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ -export const useDagRunServiceWatchDagRunUntilFinishedSuspense = = unknown[]>({ dagId, dagRunId, interval }: { +export const useDagRunServiceWaitDagRunUntilFinishedSuspense = = unknown[]>({ collect, dagId, dagRunId, interval }: { + collect?: string[]; dagId: string; dagRunId: string; interval: number; -}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceWatchDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval }, queryKey), queryFn: () => DagRunService.watchDagRunUntilFinished({ dagId, dagRunId, interval }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) as TData, ...options }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index e0b57af9852ce..c8e596943ada4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WatchDagRunUntilFinishedData, WatchDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetDagReportsData, GetDagReportsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutData, LogoutResponse, GetAuthMenusResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GridDataData, GridDataResponse, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetLatestRunData, GetLatestRunResponse, GetCalendarData, GetCalendarResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetDagReportsData, GetDagReportsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutData, LogoutResponse, GetAuthMenusResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GridDataData, GridDataResponse, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetLatestRunData, GetLatestRunResponse, GetCalendarData, GetCalendarResponse } from './types.gen'; export class AssetService { /** @@ -1051,25 +1051,27 @@ export class DagRunService { } /** - * Watch Dag Run Until Finished - * Watch a dag run until it reaches a finished state (e.g. success or failed). + * Wait Dag Run Until Finished + * Wait for a dag run until it finishes, and return its return value. * @param data The data for the request. * @param data.dagId * @param data.dagRunId - * @param data.interval + * @param data.interval Seconds to wait between dag run state checks + * @param data.collect Collect return value XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ - public static watchDagRunUntilFinished(data: WatchDagRunUntilFinishedData): CancelablePromise { + public static waitDagRunUntilFinished(data: WaitDagRunUntilFinishedData): CancelablePromise { return __request(OpenAPI, { method: 'GET', - url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/watch', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait', path: { dag_id: data.dagId, dag_run_id: data.dagRunId }, query: { - interval: data.interval + interval: data.interval, + collect: data.collect }, errors: { 401: 'Unauthorized', diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 814e4f6fd1196..c8d5e02459fa8 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2216,13 +2216,20 @@ export type TriggerDagRunData = { export type TriggerDagRunResponse = DAGRunResponse; -export type WatchDagRunUntilFinishedData = { +export type WaitDagRunUntilFinishedData = { + /** + * Collect return value XCom from task. Can be set multiple times. + */ + collect?: Array<(string)> | null; dagId: string; dagRunId: string; + /** + * Seconds to wait between dag run state checks + */ interval: number; }; -export type WatchDagRunUntilFinishedResponse = unknown; +export type WaitDagRunUntilFinishedResponse = unknown; export type GetListDagRunsBatchData = { dagId: "~"; @@ -3987,9 +3994,9 @@ export type $OpenApiTs = { }; }; }; - '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/watch': { + '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait': { get: { - req: WatchDagRunUntilFinishedData; + req: WaitDagRunUntilFinishedData; res: { /** * Successful Response diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 74a370706173b..ca103fe083535 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -104,12 +104,12 @@ def setup(request, dag_maker, session=None): dag_run1.note = (DAG1_RUN1_NOTE, "not_test") - for task in [task1, task2]: + for i, task in enumerate([task1, task2], start=1): ti = dag_run1.get_task_instance(task_id=task.task_id) ti.task = task ti.state = State.SUCCESS - session.merge(ti) + ti.xcom_push("return_value", f"result_{i}") dag_run2 = dag_maker.create_dagrun( run_id=DAG1_RUN2_ID, @@ -1623,7 +1623,7 @@ def test_should_respond_200_with_null_logical_date(self, test_client): } -class TestWatchDagRun: +class TestWaitDagRun: # The way we init async engine does not work well with FastAPI app init. # Creating the engine implicitly creates an event loop, which Airflow does # once for the entire process; creating the FastAPI app also does, but our @@ -1637,19 +1637,19 @@ def reconfigure_async_db_engine(self): _configure_async_session() def test_should_respond_401(self, unauthenticated_test_client): - response = unauthenticated_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/watch?interval=1") + response = unauthenticated_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=1") assert response.status_code == 401 def test_should_respond_403(self, unauthorized_test_client): - response = unauthorized_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/watch?interval=1") + response = unauthorized_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=1") assert response.status_code == 403 def test_should_respond_404(self, test_client): - response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/does-not-exist/watch?interval=1") + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/does-not-exist/wait?interval=1") assert response.status_code == 404 def test_should_respond_422_without_interval_param(self, test_client): - response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/watch") + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait") assert response.status_code == 422 @pytest.mark.parametrize( @@ -1657,9 +1657,13 @@ def test_should_respond_422_without_interval_param(self, test_client): [(DAG1_RUN1_ID, DAG1_RUN1_STATE), (DAG1_RUN2_ID, DAG1_RUN2_STATE)], ) def test_should_respond_200_immediately_for_finished_run(self, test_client, run_id, state): - response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{run_id}/watch?interval=100") + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{run_id}/wait?interval=100") + assert response.status_code == 200 + data = response.json() + assert data == {"state": state} + + def test_collect_task(self, test_client): + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=100&collect=task_1") assert response.status_code == 200 data = response.json() - assert isinstance(data, dict) - assert sorted(data) == ["duration", "state"] - assert data["state"] == state + assert data == {"state": DagRunState.SUCCESS, "returns": {"task_1": '"result_1"'}} From 77ec3ac7e8f940de12b287d2438c7f4b175b7e7d Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 7 Jul 2025 08:55:18 +0200 Subject: [PATCH 3/5] Use 'params' instead of literal params --- .../core_api/routes/public/test_dag_run.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index ca103fe083535..68bb37ac8c3dc 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1637,15 +1637,21 @@ def reconfigure_async_db_engine(self): _configure_async_session() def test_should_respond_401(self, unauthenticated_test_client): - response = unauthenticated_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=1") + response = unauthenticated_test_client.get( + f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait", + params={"interval": "1"}, + ) assert response.status_code == 401 def test_should_respond_403(self, unauthorized_test_client): - response = unauthorized_test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=1") + response = unauthorized_test_client.get( + f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait", + params={"interval": "1"}, + ) assert response.status_code == 403 def test_should_respond_404(self, test_client): - response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/does-not-exist/wait?interval=1") + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/does-not-exist/wait", params={"interval": "1"}) assert response.status_code == 404 def test_should_respond_422_without_interval_param(self, test_client): @@ -1657,13 +1663,15 @@ def test_should_respond_422_without_interval_param(self, test_client): [(DAG1_RUN1_ID, DAG1_RUN1_STATE), (DAG1_RUN2_ID, DAG1_RUN2_STATE)], ) def test_should_respond_200_immediately_for_finished_run(self, test_client, run_id, state): - response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{run_id}/wait?interval=100") + response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{run_id}/wait", params={"interval": "100"}) assert response.status_code == 200 data = response.json() assert data == {"state": state} def test_collect_task(self, test_client): - response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait?interval=100&collect=task_1") + response = test_client.get( + f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait", params={"interval": "1", "collect": "task_1"} + ) assert response.status_code == 200 data = response.json() assert data == {"state": DagRunState.SUCCESS, "returns": {"task_1": '"result_1"'}} From f852f6bb2c5f15e0df816067a12f74e68861bb65 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 7 Jul 2025 09:00:41 +0200 Subject: [PATCH 4/5] Mark endpoint as experimental --- .../openapi/v2-rest-api-generated.yaml | 7 +++- .../core_api/routes/public/dag_run.py | 3 ++ .../airflow/ui/openapi-gen/queries/common.ts | 11 +++++- .../ui/openapi-gen/queries/ensureQueryData.ts | 23 +++++++++-- .../ui/openapi-gen/queries/prefetch.ts | 23 +++++++++-- .../airflow/ui/openapi-gen/queries/queries.ts | 23 +++++++++-- .../ui/openapi-gen/queries/suspense.ts | 23 +++++++++-- .../ui/openapi-gen/requests/services.gen.ts | 39 ++++++++++++++++++- 8 files changed, 135 insertions(+), 17 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 0e5d0a75d4d0c..324d75b6357b2 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2151,8 +2151,11 @@ paths: get: tags: - DagRun - summary: Wait Dag Run Until Finished - description: Wait for a dag run until it finishes, and return its return value. + - experimental + summary: 'Experimental: Wait for a dag run to complete, and return task results + if requested.' + description: "\U0001F6A7 This is an experimental endpoint and may change or\ + \ be removed without notice." operationId: wait_dag_run_until_finished security: - OAuth2PasswordBearer: [] diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index c2fca613dc69c..bb248d15464bd 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -444,6 +444,9 @@ def trigger_dag_run( @dag_run_router.get( "/{dag_run_id}/wait", + tags=["experimental"], + summary="Experimental: Wait for a dag run to complete, and return task results if requested.", + description="🚧 This is an experimental endpoint and may change or be removed without notice.", responses={ **create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), status.HTTP_200_OK: { diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 42475cd404019..4b39f78747144 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { UseQueryResult } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; export type AssetServiceGetAssetsDefaultResponse = Awaited>; export type AssetServiceGetAssetsQueryResult = UseQueryResult; @@ -168,6 +168,15 @@ export const UseDagRunServiceWaitDagRunUntilFinishedKeyFn = ({ collect, dagId, d dagRunId: string; interval: number; }, queryKey?: Array) => [useDagRunServiceWaitDagRunUntilFinishedKey, ...(queryKey ?? [{ collect, dagId, dagRunId, interval }])]; +export type ExperimentalServiceWaitDagRunUntilFinishedDefaultResponse = Awaited>; +export type ExperimentalServiceWaitDagRunUntilFinishedQueryResult = UseQueryResult; +export const useExperimentalServiceWaitDagRunUntilFinishedKey = "ExperimentalServiceWaitDagRunUntilFinished"; +export const UseExperimentalServiceWaitDagRunUntilFinishedKeyFn = ({ collect, dagId, dagRunId, interval }: { + collect?: string[]; + dagId: string; + dagRunId: string; + interval: number; +}, queryKey?: Array) => [useExperimentalServiceWaitDagRunUntilFinishedKey, ...(queryKey ?? [{ collect, dagId, dagRunId, interval }])]; export type DagSourceServiceGetDagSourceDefaultResponse = Awaited>; export type DagSourceServiceGetDagSourceQueryResult = UseQueryResult; export const useDagSourceServiceGetDagSourceKey = "DagSourceServiceGetDagSource"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 3815f021abefc..fff9e35c58cdc 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { type QueryClient } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -296,8 +296,8 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { updatedAtLte?: string; }) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) }); /** -* Wait Dag Run Until Finished -* Wait for a dag run until it finishes, and return its return value. +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. * @param data The data for the request. * @param data.dagId * @param data.dagRunId @@ -313,6 +313,23 @@ export const ensureUseDagRunServiceWaitDagRunUntilFinishedData = (queryClient: Q interval: number; }) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) }); /** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.collect Collect return value XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const ensureUseExperimentalServiceWaitDagRunUntilFinishedData = (queryClient: QueryClient, { collect, dagId, dagRunId, interval }: { + collect?: string[]; + dagId: string; + dagRunId: string; + interval: number; +}) => queryClient.ensureQueryData({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 9b4d61a584e54..9d4d0917e0fed 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { type QueryClient } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -296,8 +296,8 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { d updatedAtLte?: string; }) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) }); /** -* Wait Dag Run Until Finished -* Wait for a dag run until it finishes, and return its return value. +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. * @param data The data for the request. * @param data.dagId * @param data.dagRunId @@ -313,6 +313,23 @@ export const prefetchUseDagRunServiceWaitDagRunUntilFinished = (queryClient: Que interval: number; }) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) }); /** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.collect Collect return value XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const prefetchUseExperimentalServiceWaitDagRunUntilFinished = (queryClient: QueryClient, { collect, dagId, dagRunId, interval }: { + collect?: string[]; + dagId: string; + dagRunId: string; + interval: number; +}) => queryClient.prefetchQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index ce8f8be77e565..5a30a6f304aa0 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -296,8 +296,8 @@ export const useDagRunServiceGetDagRuns = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) as TData, ...options }); /** -* Wait Dag Run Until Finished -* Wait for a dag run until it finishes, and return its return value. +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. * @param data The data for the request. * @param data.dagId * @param data.dagRunId @@ -313,6 +313,23 @@ export const useDagRunServiceWaitDagRunUntilFinished = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) as TData, ...options }); /** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.collect Collect return value XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useExperimentalServiceWaitDagRunUntilFinished = = unknown[]>({ collect, dagId, dagRunId, interval }: { + collect?: string[]; + dagId: string; + dagRunId: string; + interval: number; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }, queryKey), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) as TData, ...options }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index a93226205ac79..9605dcefb6120 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -1,7 +1,7 @@ // generated with @7nohe/openapi-react-query-codegen@1.6.2 import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query"; -import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; +import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen"; import { DagRunState, DagWarningType } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -296,8 +296,8 @@ export const useDagRunServiceGetDagRunsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGte, endDateLte, limit, logicalDateGte, logicalDateLte, offset, orderBy, runAfterGte, runAfterLte, runIdPattern, runType, startDateGte, startDateLte, state, updatedAtGte, updatedAtLte }) as TData, ...options }); /** -* Wait Dag Run Until Finished -* Wait for a dag run until it finishes, and return its return value. +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. * @param data The data for the request. * @param data.dagId * @param data.dagRunId @@ -313,6 +313,23 @@ export const useDagRunServiceWaitDagRunUntilFinishedSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) as TData, ...options }); /** +* Experimental: Wait for a dag run to complete, and return task results if requested. +* 🚧 This is an experimental endpoint and may change or be removed without notice. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.interval Seconds to wait between dag run state checks +* @param data.collect Collect return value XCom from task. Can be set multiple times. +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useExperimentalServiceWaitDagRunUntilFinishedSuspense = = unknown[]>({ collect, dagId, dagRunId, interval }: { + collect?: string[]; + dagId: string; + dagRunId: string; + interval: number; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }, queryKey), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) as TData, ...options }); +/** * Get Dag Source * Get source code using file token. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index c8e596943ada4..d674a2cf22ae8 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -1051,8 +1051,8 @@ export class DagRunService { } /** - * Wait Dag Run Until Finished - * Wait for a dag run until it finishes, and return its return value. + * Experimental: Wait for a dag run to complete, and return task results if requested. + * 🚧 This is an experimental endpoint and may change or be removed without notice. * @param data The data for the request. * @param data.dagId * @param data.dagRunId @@ -1111,6 +1111,41 @@ export class DagRunService { } +export class ExperimentalService { + /** + * Experimental: Wait for a dag run to complete, and return task results if requested. + * 🚧 This is an experimental endpoint and may change or be removed without notice. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.interval Seconds to wait between dag run state checks + * @param data.collect Collect return value XCom from task. Can be set multiple times. + * @returns unknown Successful Response + * @throws ApiError + */ + public static waitDagRunUntilFinished(data: WaitDagRunUntilFinishedData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId + }, + query: { + interval: data.interval, + collect: data.collect + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + +} + export class DagSourceService { /** * Get Dag Source From 58d7fa5db53ab90f9a35c91c47465808e7722306 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 9 Jul 2025 07:28:36 +0200 Subject: [PATCH 5/5] Rename param key --- .../core_api/openapi/v2-rest-api-generated.yaml | 10 +++++----- .../core_api/routes/public/dag_run.py | 10 +++++----- .../core_api/services/public/dag_run.py | 8 ++++---- .../src/airflow/ui/openapi-gen/queries/common.ts | 12 ++++++------ .../ui/openapi-gen/queries/ensureQueryData.ts | 16 ++++++++-------- .../airflow/ui/openapi-gen/queries/prefetch.ts | 16 ++++++++-------- .../airflow/ui/openapi-gen/queries/queries.ts | 16 ++++++++-------- .../airflow/ui/openapi-gen/queries/suspense.ts | 16 ++++++++-------- .../ui/openapi-gen/requests/services.gen.ts | 8 ++++---- .../airflow/ui/openapi-gen/requests/types.gen.ts | 8 ++++---- .../core_api/routes/public/test_dag_run.py | 4 ++-- 11 files changed, 62 insertions(+), 62 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 324d75b6357b2..073e2c9c8a38e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2181,7 +2181,7 @@ paths: description: Seconds to wait between dag run state checks title: Interval description: Seconds to wait between dag run state checks - - name: collect + - name: result in: query required: false schema: @@ -2190,9 +2190,9 @@ paths: items: type: string - type: 'null' - description: Collect return value XCom from task. Can be set multiple times. - title: Collect - description: Collect return value XCom from task. Can be set multiple times. + description: Collect result XCom from task. Can be set multiple times. + title: Result + description: Collect result XCom from task. Can be set multiple times. responses: '200': description: Successful Response @@ -2204,7 +2204,7 @@ paths: type: string example: '{"state": "running"} - {"state": "success", "returns": {"op": 42}} + {"state": "success", "results": {"op": 42}} ' '401': diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index bb248d15464bd..5d974ec012cd5 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -458,7 +458,7 @@ def trigger_dag_run( "example": textwrap.dedent( """\ {"state": "running"} - {"state": "success", "returns": {"op": 42}} + {"state": "success", "results": {"op": 42}} """ ), } @@ -473,12 +473,12 @@ def wait_dag_run_until_finished( dag_run_id: str, session: SessionDep, interval: Annotated[float, Query(gt=0.0, description="Seconds to wait between dag run state checks")], - collect_task_ids: Annotated[ + result_task_ids: Annotated[ list[str] | None, - Query(alias="collect", description="Collect return value XCom from task. Can be set multiple times."), + Query(alias="result", description="Collect result XCom from task. Can be set multiple times."), ] = None, ): - "Wait for a dag run until it finishes, and return its return value." + "Wait for a dag run until it finishes, and return its result(s)." if not session.scalar(select(1).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id)): raise HTTPException( status.HTTP_404_NOT_FOUND, @@ -488,7 +488,7 @@ def wait_dag_run_until_finished( dag_id=dag_id, run_id=dag_run_id, interval=interval, - collect_task_ids=collect_task_ids, + result_task_ids=result_task_ids, ) return StreamingResponse(waiter.wait()) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py index dc1c8e4515c1a..259389e799494 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py @@ -42,7 +42,7 @@ class DagRunWaiter: dag_id: str run_id: str interval: float - collect_task_ids: list[str] | None + result_task_ids: list[str] | None async def _get_dag_run(self) -> DagRun: async with create_session_async() as session: @@ -52,7 +52,7 @@ def _serialize_xcoms(self) -> dict[str, Any]: xcom_query = XComModel.get_many( run_id=self.run_id, key=XCOM_RETURN_KEY, - task_ids=self.collect_task_ids, + task_ids=self.result_task_ids, dag_ids=self.dag_id, ) xcom_query = xcom_query.order_by(XComModel.task_id, XComModel.map_index) @@ -72,8 +72,8 @@ def _serialize_response(self, dag_run: DagRun) -> str: resp = {"state": dag_run.state} if dag_run.state not in State.finished_dr_states: return json.dumps(resp) - if self.collect_task_ids: - resp["returns"] = self._serialize_xcoms() + if self.result_task_ids: + resp["results"] = self._serialize_xcoms() return json.dumps(resp) async def wait(self) -> AsyncGenerator[str, None]: diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 4b39f78747144..8410c0ba4748b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -162,21 +162,21 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, endDateGte, endDateLte, export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = Awaited>; export type DagRunServiceWaitDagRunUntilFinishedQueryResult = UseQueryResult; export const useDagRunServiceWaitDagRunUntilFinishedKey = "DagRunServiceWaitDagRunUntilFinished"; -export const UseDagRunServiceWaitDagRunUntilFinishedKeyFn = ({ collect, dagId, dagRunId, interval }: { - collect?: string[]; +export const UseDagRunServiceWaitDagRunUntilFinishedKeyFn = ({ dagId, dagRunId, interval, result }: { dagId: string; dagRunId: string; interval: number; -}, queryKey?: Array) => [useDagRunServiceWaitDagRunUntilFinishedKey, ...(queryKey ?? [{ collect, dagId, dagRunId, interval }])]; + result?: string[]; +}, queryKey?: Array) => [useDagRunServiceWaitDagRunUntilFinishedKey, ...(queryKey ?? [{ dagId, dagRunId, interval, result }])]; export type ExperimentalServiceWaitDagRunUntilFinishedDefaultResponse = Awaited>; export type ExperimentalServiceWaitDagRunUntilFinishedQueryResult = UseQueryResult; export const useExperimentalServiceWaitDagRunUntilFinishedKey = "ExperimentalServiceWaitDagRunUntilFinished"; -export const UseExperimentalServiceWaitDagRunUntilFinishedKeyFn = ({ collect, dagId, dagRunId, interval }: { - collect?: string[]; +export const UseExperimentalServiceWaitDagRunUntilFinishedKeyFn = ({ dagId, dagRunId, interval, result }: { dagId: string; dagRunId: string; interval: number; -}, queryKey?: Array) => [useExperimentalServiceWaitDagRunUntilFinishedKey, ...(queryKey ?? [{ collect, dagId, dagRunId, interval }])]; + result?: string[]; +}, queryKey?: Array) => [useExperimentalServiceWaitDagRunUntilFinishedKey, ...(queryKey ?? [{ dagId, dagRunId, interval, result }])]; export type DagSourceServiceGetDagSourceDefaultResponse = Awaited>; export type DagSourceServiceGetDagSourceQueryResult = UseQueryResult; export const useDagSourceServiceGetDagSourceKey = "DagSourceServiceGetDagSource"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index fff9e35c58cdc..7ccd7ca1a9de4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -302,16 +302,16 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { * @param data.dagId * @param data.dagRunId * @param data.interval Seconds to wait between dag run state checks -* @param data.collect Collect return value XCom from task. Can be set multiple times. +* @param data.result Collect result XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ -export const ensureUseDagRunServiceWaitDagRunUntilFinishedData = (queryClient: QueryClient, { collect, dagId, dagRunId, interval }: { - collect?: string[]; +export const ensureUseDagRunServiceWaitDagRunUntilFinishedData = (queryClient: QueryClient, { dagId, dagRunId, interval, result }: { dagId: string; dagRunId: string; interval: number; -}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) }); + result?: string[]; +}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice. @@ -319,16 +319,16 @@ export const ensureUseDagRunServiceWaitDagRunUntilFinishedData = (queryClient: Q * @param data.dagId * @param data.dagRunId * @param data.interval Seconds to wait between dag run state checks -* @param data.collect Collect return value XCom from task. Can be set multiple times. +* @param data.result Collect result XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ -export const ensureUseExperimentalServiceWaitDagRunUntilFinishedData = (queryClient: QueryClient, { collect, dagId, dagRunId, interval }: { - collect?: string[]; +export const ensureUseExperimentalServiceWaitDagRunUntilFinishedData = (queryClient: QueryClient, { dagId, dagRunId, interval, result }: { dagId: string; dagRunId: string; interval: number; -}) => queryClient.ensureQueryData({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) }); + result?: string[]; +}) => queryClient.ensureQueryData({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 9d4d0917e0fed..60fc346fdc62a 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -302,16 +302,16 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { d * @param data.dagId * @param data.dagRunId * @param data.interval Seconds to wait between dag run state checks -* @param data.collect Collect return value XCom from task. Can be set multiple times. +* @param data.result Collect result XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ -export const prefetchUseDagRunServiceWaitDagRunUntilFinished = (queryClient: QueryClient, { collect, dagId, dagRunId, interval }: { - collect?: string[]; +export const prefetchUseDagRunServiceWaitDagRunUntilFinished = (queryClient: QueryClient, { dagId, dagRunId, interval, result }: { dagId: string; dagRunId: string; interval: number; -}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) }); + result?: string[]; +}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice. @@ -319,16 +319,16 @@ export const prefetchUseDagRunServiceWaitDagRunUntilFinished = (queryClient: Que * @param data.dagId * @param data.dagRunId * @param data.interval Seconds to wait between dag run state checks -* @param data.collect Collect return value XCom from task. Can be set multiple times. +* @param data.result Collect result XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ -export const prefetchUseExperimentalServiceWaitDagRunUntilFinished = (queryClient: QueryClient, { collect, dagId, dagRunId, interval }: { - collect?: string[]; +export const prefetchUseExperimentalServiceWaitDagRunUntilFinished = (queryClient: QueryClient, { dagId, dagRunId, interval, result }: { dagId: string; dagRunId: string; interval: number; -}) => queryClient.prefetchQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) }); + result?: string[]; +}) => queryClient.prefetchQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 5a30a6f304aa0..d2a6abe0a6b22 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -302,16 +302,16 @@ export const useDagRunServiceGetDagRuns = = unknown[]>({ collect, dagId, dagRunId, interval }: { - collect?: string[]; +export const useDagRunServiceWaitDagRunUntilFinished = = unknown[]>({ dagId, dagRunId, interval, result }: { dagId: string; dagRunId: string; interval: number; -}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) as TData, ...options }); + result?: string[]; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) as TData, ...options }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice. @@ -319,16 +319,16 @@ export const useDagRunServiceWaitDagRunUntilFinished = = unknown[]>({ collect, dagId, dagRunId, interval }: { - collect?: string[]; +export const useExperimentalServiceWaitDagRunUntilFinished = = unknown[]>({ dagId, dagRunId, interval, result }: { dagId: string; dagRunId: string; interval: number; -}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }, queryKey), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) as TData, ...options }); + result?: string[]; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }, queryKey), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) as TData, ...options }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 9605dcefb6120..a8b84a5a173a3 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -302,16 +302,16 @@ export const useDagRunServiceGetDagRunsSuspense = = unknown[]>({ collect, dagId, dagRunId, interval }: { - collect?: string[]; +export const useDagRunServiceWaitDagRunUntilFinishedSuspense = = unknown[]>({ dagId, dagRunId, interval, result }: { dagId: string; dagRunId: string; interval: number; -}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) as TData, ...options }); + result?: string[]; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) as TData, ...options }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice. @@ -319,16 +319,16 @@ export const useDagRunServiceWaitDagRunUntilFinishedSuspense = = unknown[]>({ collect, dagId, dagRunId, interval }: { - collect?: string[]; +export const useExperimentalServiceWaitDagRunUntilFinishedSuspense = = unknown[]>({ dagId, dagRunId, interval, result }: { dagId: string; dagRunId: string; interval: number; -}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ collect, dagId, dagRunId, interval }, queryKey), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ collect, dagId, dagRunId, interval }) as TData, ...options }); + result?: string[]; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseExperimentalServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }, queryKey), queryFn: () => ExperimentalService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) as TData, ...options }); /** * Get Dag Source * Get source code using file token. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index d674a2cf22ae8..ba77ae0668075 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -1057,7 +1057,7 @@ export class DagRunService { * @param data.dagId * @param data.dagRunId * @param data.interval Seconds to wait between dag run state checks - * @param data.collect Collect return value XCom from task. Can be set multiple times. + * @param data.result Collect result XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ @@ -1071,7 +1071,7 @@ export class DagRunService { }, query: { interval: data.interval, - collect: data.collect + result: data.result }, errors: { 401: 'Unauthorized', @@ -1119,7 +1119,7 @@ export class ExperimentalService { * @param data.dagId * @param data.dagRunId * @param data.interval Seconds to wait between dag run state checks - * @param data.collect Collect return value XCom from task. Can be set multiple times. + * @param data.result Collect result XCom from task. Can be set multiple times. * @returns unknown Successful Response * @throws ApiError */ @@ -1133,7 +1133,7 @@ export class ExperimentalService { }, query: { interval: data.interval, - collect: data.collect + result: data.result }, errors: { 401: 'Unauthorized', diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index c8d5e02459fa8..80b644ce63f73 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2217,16 +2217,16 @@ export type TriggerDagRunData = { export type TriggerDagRunResponse = DAGRunResponse; export type WaitDagRunUntilFinishedData = { - /** - * Collect return value XCom from task. Can be set multiple times. - */ - collect?: Array<(string)> | null; dagId: string; dagRunId: string; /** * Seconds to wait between dag run state checks */ interval: number; + /** + * Collect result XCom from task. Can be set multiple times. + */ + result?: Array<(string)> | null; }; export type WaitDagRunUntilFinishedResponse = unknown; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 68bb37ac8c3dc..6ce599e0c675b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1670,8 +1670,8 @@ def test_should_respond_200_immediately_for_finished_run(self, test_client, run_ def test_collect_task(self, test_client): response = test_client.get( - f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait", params={"interval": "1", "collect": "task_1"} + f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait", params={"interval": "1", "result": "task_1"} ) assert response.status_code == 200 data = response.json() - assert data == {"state": DagRunState.SUCCESS, "returns": {"task_1": '"result_1"'}} + assert data == {"state": DagRunState.SUCCESS, "results": {"task_1": '"result_1"'}}