From acc3312ce6f12a965bf690273a3ae14065a1f7ea Mon Sep 17 00:00:00 2001 From: Michael Smith-Chandler Date: Thu, 21 Nov 2024 22:51:38 +0000 Subject: [PATCH 1/5] Add xcom get entries fastapi endpoint --- .../api_connexion/endpoints/xcom_endpoint.py | 3 +- .../api_fastapi/core_api/datamodels/xcom.py | 7 +++ .../core_api/routes/public/xcom.py | 61 ++++++++++++++++++- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py index c86617391ab12..d909b36e2a7c5 100644 --- a/airflow/api_connexion/endpoints/xcom_endpoint.py +++ b/airflow/api_connexion/endpoints/xcom_endpoint.py @@ -45,6 +45,7 @@ from airflow.api_connexion.types import APIResponse +@mark_fastapi_migration_done @security.requires_access_dag("GET", DagAccessEntity.XCOM) @format_parameters({"limit": check_limit}) @provide_session @@ -68,7 +69,7 @@ def get_xcom_entries( else: query = query.where(XCom.dag_id == dag_id) query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.run_id == DR.run_id)) - + if task_id != "~": query = query.where(XCom.task_id == task_id) if dag_run_id != "~": diff --git a/airflow/api_fastapi/core_api/datamodels/xcom.py b/airflow/api_fastapi/core_api/datamodels/xcom.py index 370aa651cb2c8..8613be34ea2fc 100644 --- a/airflow/api_fastapi/core_api/datamodels/xcom.py +++ b/airflow/api_fastapi/core_api/datamodels/xcom.py @@ -49,3 +49,10 @@ class XComResponseString(XComResponse): @field_validator("value", mode="before") def value_to_string(cls, v): return str(v) if v is not None else None + +class XComCollection(BaseModel): + """List of XCom items.""" + + xcom_entries: List[XComResponse] + total_entries: int + diff --git a/airflow/api_fastapi/core_api/routes/public/xcom.py b/airflow/api_fastapi/core_api/routes/public/xcom.py index 3a8e6130e452c..140c3e81852a3 100644 --- a/airflow/api_fastapi/core_api/routes/public/xcom.py +++ b/airflow/api_fastapi/core_api/routes/public/xcom.py @@ -19,16 +19,19 @@ import copy from typing import Annotated -from fastapi import Depends, HTTPException, Query, status +from fastapi import Depends, HTTPException, Query, status, SortParam from sqlalchemy import and_, select from sqlalchemy.orm import Session -from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.db.common import get_session, paginated_select from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.xcom import ( XComResponseNative, XComResponseString, + XComCollection ) + +from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.models import DagRun as DR, XCom from airflow.settings import conf @@ -90,5 +93,57 @@ def get_xcom_entry( if stringify: return XComResponseString.model_validate(item) - return XComResponseNative.model_validate(item) + +@xcom_router.get( + "/{xcom_key}", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_xcom_entries( + dag_id: str, + dag_run_id: str, + task_id: str, + limit: QueryLimit, + offset: QueryOffset, + session: Annotated[Session, Depends(get_session)], + stringify: Annotated[bool, Query()] = True, + xcom_key: Annotated[str | None, Query()] = None, + map_index: Annotated[int | None, Query()] = None, +) -> XComCollection: + """ + Get all XCom entries. + + This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to retrieve XCom entries for all DAGs. + """ + query = select(XCom) + if dag_id != "~": + query = query.where(XCom.dag_id == dag_id) + query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.run_id == DR.run_id)) + + if task_id != "~": + query = query.where(XCom.task_id == task_id) + if dag_run_id != "~": + query = query.where(DR.run_id == dag_run_id) + if map_index is not None: + query = query.where(XCom.map_index == map_index) + if xcom_key is not None: + query = query.where(XCom.key == xcom_key) + + query, total_entries = paginated_select( + query, + [], + SortParam(["dag_id", "task_id", "run_id", "map_index", "key"], XCom), + offset, + limit, + session + ) + xcoms = session.scalars(query) + return XComCollection( + xcom_entries=[XComResponse.model_validate(xcom) for xcom in xcoms], + total_entries=total_entries + ) From 5dc76414c450b97e78e2b7e73adf2ad9e9f7db84 Mon Sep 17 00:00:00 2001 From: Michael Smith-Chandler Date: Mon, 25 Nov 2024 18:30:35 +0000 Subject: [PATCH 2/5] Add tests, fixes and api spec --- .../api_connexion/endpoints/xcom_endpoint.py | 2 +- .../api_fastapi/core_api/datamodels/xcom.py | 4 +- .../core_api/openapi/v1-generated.yaml | 336 +++++++++++++----- .../core_api/routes/public/__init__.py | 2 +- .../core_api/routes/public/xcom.py | 33 +- .../core_api/routes/public/test_xcom.py | 297 +++++++++++++++- 6 files changed, 551 insertions(+), 123 deletions(-) diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py index d909b36e2a7c5..cb3faf7379e0a 100644 --- a/airflow/api_connexion/endpoints/xcom_endpoint.py +++ b/airflow/api_connexion/endpoints/xcom_endpoint.py @@ -69,7 +69,7 @@ def get_xcom_entries( else: query = query.where(XCom.dag_id == dag_id) query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.run_id == DR.run_id)) - + if task_id != "~": query = query.where(XCom.task_id == task_id) if dag_run_id != "~": diff --git a/airflow/api_fastapi/core_api/datamodels/xcom.py b/airflow/api_fastapi/core_api/datamodels/xcom.py index 8613be34ea2fc..4e3c6f54a7a4f 100644 --- a/airflow/api_fastapi/core_api/datamodels/xcom.py +++ b/airflow/api_fastapi/core_api/datamodels/xcom.py @@ -50,9 +50,9 @@ class XComResponseString(XComResponse): def value_to_string(cls, v): return str(v) if v is not None else None + class XComCollection(BaseModel): """List of XCom items.""" - xcom_entries: List[XComResponse] + xcom_entries: list[XComResponse] total_entries: int - diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 2610dbcf6123c..c53f68a8438af 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -3651,6 +3651,200 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}: + get: + tags: + - XCom + summary: Get Xcom Entry + description: Get an XCom entry. + operationId: get_xcom_entry + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: xcom_key + in: path + required: true + schema: + type: string + title: Xcom Key + - name: map_index + in: query + required: false + schema: + type: integer + minimum: -1 + default: -1 + title: Map Index + - name: deserialize + in: query + required: false + schema: + type: boolean + default: false + title: Deserialize + - name: stringify + in: query + required: false + schema: + type: boolean + default: true + title: Stringify + responses: + '200': + description: Successful Response + content: + application/json: + schema: + anyOf: + - $ref: '#/components/schemas/XComResponseNative' + - $ref: '#/components/schemas/XComResponseString' + title: Response Get Xcom Entry + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries: + get: + tags: + - XCom + summary: Get Xcom Entries + description: 'Get all XCom entries. + + + This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to + retrieve XCom entries for all DAGs.' + operationId: get_xcom_entries + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: xcom_key + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Xcom Key + - name: map_index + in: query + required: false + schema: + anyOf: + - type: integer + minimum: -1 + - type: 'null' + title: Map Index + - name: limit + in: query + required: false + schema: + type: integer + minimum: 0 + default: 100 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + default: 0 + title: Offset + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/XComCollection' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}: get: tags: @@ -5199,100 +5393,6 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}: - get: - tags: - - XCom - summary: Get Xcom Entry - description: Get an XCom entry. - operationId: get_xcom_entry - parameters: - - name: dag_id - in: path - required: true - schema: - type: string - title: Dag Id - - name: task_id - in: path - required: true - schema: - type: string - title: Task Id - - name: dag_run_id - in: path - required: true - schema: - type: string - title: Dag Run Id - - name: xcom_key - in: path - required: true - schema: - type: string - title: Xcom Key - - name: map_index - in: query - required: false - schema: - type: integer - minimum: -1 - default: -1 - title: Map Index - - name: deserialize - in: query - required: false - schema: - type: boolean - default: false - title: Deserialize - - name: stringify - in: query - required: false - schema: - type: boolean - default: true - title: Stringify - responses: - '200': - description: Successful Response - content: - application/json: - schema: - anyOf: - - $ref: '#/components/schemas/XComResponseNative' - - $ref: '#/components/schemas/XComResponseString' - title: Response Get Xcom Entry - '401': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Unauthorized - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Forbidden - '400': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Bad Request - '404': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Not Found - '422': - description: Validation Error - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}: get: tags: @@ -8715,6 +8815,54 @@ components: - git_version title: VersionInfo description: Version information serializer for responses. + XComCollection: + properties: + xcom_entries: + items: + $ref: '#/components/schemas/XComResponse' + type: array + title: Xcom Entries + total_entries: + type: integer + title: Total Entries + type: object + required: + - xcom_entries + - total_entries + title: XComCollection + description: List of XCom items. + XComResponse: + properties: + key: + type: string + title: Key + timestamp: + type: string + format: date-time + title: Timestamp + logical_date: + type: string + format: date-time + title: Logical Date + map_index: + type: integer + title: Map Index + task_id: + type: string + title: Task Id + dag_id: + type: string + title: Dag Id + type: object + required: + - key + - timestamp + - logical_date + - map_index + - task_id + - dag_id + title: XComResponse + description: Serializer for a xcom item. XComResponseNative: properties: key: diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py b/airflow/api_fastapi/core_api/routes/public/__init__.py index e2dbed54f710c..3e05eb876802c 100644 --- a/airflow/api_fastapi/core_api/routes/public/__init__.py +++ b/airflow/api_fastapi/core_api/routes/public/__init__.py @@ -68,10 +68,10 @@ authenticated_router.include_router(plugins_router) authenticated_router.include_router(pools_router) authenticated_router.include_router(providers_router) +authenticated_router.include_router(xcom_router) authenticated_router.include_router(task_instances_router) authenticated_router.include_router(tasks_router) authenticated_router.include_router(variables_router) -authenticated_router.include_router(xcom_router) authenticated_router.include_router(task_instances_log_router) diff --git a/airflow/api_fastapi/core_api/routes/public/xcom.py b/airflow/api_fastapi/core_api/routes/public/xcom.py index 140c3e81852a3..59ff2194e32ee 100644 --- a/airflow/api_fastapi/core_api/routes/public/xcom.py +++ b/airflow/api_fastapi/core_api/routes/public/xcom.py @@ -19,19 +19,19 @@ import copy from typing import Annotated -from fastapi import Depends, HTTPException, Query, status, SortParam +from fastapi import Depends, HTTPException, Query, status from sqlalchemy import and_, select from sqlalchemy.orm import Session from airflow.api_fastapi.common.db.common import get_session, paginated_select +from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.xcom import ( + XComCollection, + XComResponse, XComResponseNative, XComResponseString, - XComCollection ) - -from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.models import DagRun as DR, XCom from airflow.settings import conf @@ -95,8 +95,9 @@ def get_xcom_entry( return XComResponseString.model_validate(item) return XComResponseNative.model_validate(item) + @xcom_router.get( - "/{xcom_key}", + "", responses=create_openapi_http_exception_doc( [ status.HTTP_400_BAD_REQUEST, @@ -111,20 +112,19 @@ def get_xcom_entries( limit: QueryLimit, offset: QueryOffset, session: Annotated[Session, Depends(get_session)], - stringify: Annotated[bool, Query()] = True, xcom_key: Annotated[str | None, Query()] = None, - map_index: Annotated[int | None, Query()] = None, + map_index: Annotated[int | None, Query(ge=-1)] = None, ) -> XComCollection: """ Get all XCom entries. - + This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to retrieve XCom entries for all DAGs. """ query = select(XCom) if dag_id != "~": query = query.where(XCom.dag_id == dag_id) query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.run_id == DR.run_id)) - + if task_id != "~": query = query.where(XCom.task_id == task_id) if dag_run_id != "~": @@ -135,15 +135,14 @@ def get_xcom_entries( query = query.where(XCom.key == xcom_key) query, total_entries = paginated_select( - query, - [], - SortParam(["dag_id", "task_id", "run_id", "map_index", "key"], XCom), - offset, - limit, - session + select=query, + filters=[], + order_by=SortParam(["dag_id", "task_id", "run_id", "map_index", "key"], XCom), + offset=offset, + limit=limit, + session=session, ) xcoms = session.scalars(query) return XComCollection( - xcom_entries=[XComResponse.model_validate(xcom) for xcom in xcoms], - total_entries=total_entries + xcom_entries=[XComResponse.model_validate(xcom) for xcom in xcoms], total_entries=total_entries ) diff --git a/tests/api_fastapi/core_api/routes/public/test_xcom.py b/tests/api_fastapi/core_api/routes/public/test_xcom.py index e0010c79fef1a..8697ee89fc01a 100644 --- a/tests/api_fastapi/core_api/routes/public/test_xcom.py +++ b/tests/api_fastapi/core_api/routes/public/test_xcom.py @@ -21,6 +21,7 @@ import pytest from airflow.models import XCom +from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.models.xcom import BaseXCom, resolve_xcom_backend @@ -36,13 +37,17 @@ TEST_XCOM_KEY = "test_xcom_key" TEST_XCOM_VALUE = {"key": "value"} -TEST_XCOM_KEY3 = "test_xcom_key_non_existing" +TEST_XCOM_KEY_2 = "test_xcom_key_non_existing" TEST_DAG_ID = "test-dag-id" TEST_TASK_ID = "test-task-id" TEST_EXECUTION_DATE = "2005-04-02T00:00:00+00:00" +TEST_DAG_ID_2 = "test-dag-id-2" +TEST_TASK_ID_2 = "test-task-id-2" + logical_date_parsed = timezone.parse(TEST_EXECUTION_DATE) +logical_date_formatted = logical_date_parsed.strftime("%Y-%m-%dT%H:%M:%SZ") run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed) @@ -92,18 +97,18 @@ def clear_db(): @pytest.fixture(autouse=True) def setup(self) -> None: self.clear_db() + _create_dag_run() def teardown_method(self) -> None: self.clear_db() - def create_xcom(self, key, value, backend=XCom) -> None: - _create_dag_run() + def _create_xcom(self, key, value, backend=XCom) -> None: _create_xcom(key, value, backend) class TestGetXComEntry(TestXComEndpoint): def test_should_respond_200_stringify(self, test_client): - self.create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE) + self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE) response = test_client.get( f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}" ) @@ -121,7 +126,7 @@ def test_should_respond_200_stringify(self, test_client): } def test_should_respond_200_native(self, test_client): - self.create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE) + self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE) response = test_client.get( f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}?stringify=false" ) @@ -140,10 +145,10 @@ def test_should_respond_200_native(self, test_client): def test_should_raise_404_for_non_existent_xcom(self, test_client): response = test_client.get( - f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY3}" + f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY_2}" ) assert response.status_code == 404 - assert response.json()["detail"] == f"XCom entry with key: `{TEST_XCOM_KEY3}` not found" + assert response.json()["detail"] == f"XCom entry with key: `{TEST_XCOM_KEY_2}` not found" @pytest.mark.parametrize( "support_deserialize, params, expected_status_or_value", @@ -191,7 +196,7 @@ def test_custom_xcom_deserialize( self, support_deserialize: bool, params: str, expected_status_or_value: int | str, test_client ): XCom = resolve_xcom_backend() - self.create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE, backend=XCom) + self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE, backend=XCom) url = f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}" with mock.patch("airflow.api_fastapi.core_api.routes.public.xcom.XCom", XCom): @@ -203,3 +208,279 @@ def test_custom_xcom_deserialize( else: assert response.status_code == 200 assert response.json()["value"] == expected_status_or_value + + +class TestGetXComEntries(TestXComEndpoint): + def test_should_respond_200(self, test_client): + self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed, TEST_TASK_ID) + response = test_client.get( + f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries" + ) + assert response.status_code == 200 + response_data = response.json() + for xcom_entry in response_data["xcom_entries"]: + xcom_entry["timestamp"] = "TIMESTAMP" + + expected_response = { + "xcom_entries": [ + { + "dag_id": TEST_DAG_ID, + "logical_date": logical_date_formatted, + "key": f"{TEST_XCOM_KEY}-0", + "task_id": TEST_TASK_ID, + "timestamp": "TIMESTAMP", + "map_index": -1, + }, + { + "dag_id": TEST_DAG_ID, + "logical_date": logical_date_formatted, + "key": f"{TEST_XCOM_KEY}-1", + "task_id": TEST_TASK_ID, + "timestamp": "TIMESTAMP", + "map_index": -1, + }, + ], + "total_entries": 2, + } + assert response_data == expected_response + + def test_should_respond_200_with_tilde(self, test_client): + self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed, TEST_TASK_ID) + self._create_xcom_entries(TEST_DAG_ID_2, run_id, logical_date_parsed, TEST_TASK_ID_2) + + response = test_client.get("/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries") + assert response.status_code == 200 + response_data = response.json() + for xcom_entry in response_data["xcom_entries"]: + xcom_entry["timestamp"] = "TIMESTAMP" + + expected_response = { + "xcom_entries": [ + { + "dag_id": TEST_DAG_ID, + "logical_date": logical_date_formatted, + "key": f"{TEST_XCOM_KEY}-0", + "task_id": TEST_TASK_ID, + "timestamp": "TIMESTAMP", + "map_index": -1, + }, + { + "dag_id": TEST_DAG_ID, + "logical_date": logical_date_formatted, + "key": f"{TEST_XCOM_KEY}-1", + "task_id": TEST_TASK_ID, + "timestamp": "TIMESTAMP", + "map_index": -1, + }, + { + "dag_id": TEST_DAG_ID_2, + "logical_date": logical_date_formatted, + "key": f"{TEST_XCOM_KEY}-0", + "task_id": TEST_TASK_ID_2, + "timestamp": "TIMESTAMP", + "map_index": -1, + }, + { + "dag_id": TEST_DAG_ID_2, + "logical_date": logical_date_formatted, + "key": f"{TEST_XCOM_KEY}-1", + "task_id": TEST_TASK_ID_2, + "timestamp": "TIMESTAMP", + "map_index": -1, + }, + ], + "total_entries": 4, + } + assert response_data == expected_response + + @pytest.mark.parametrize("map_index", (0, 1, None)) + def test_should_respond_200_with_map_index(self, map_index, test_client): + self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed, TEST_TASK_ID, mapped_ti=True) + + response = test_client.get( + "/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries", + params={"map_index": map_index} if map_index is not None else None, + ) + assert response.status_code == 200 + response_data = response.json() + + if map_index is None: + expected_entries = [ + { + "dag_id": TEST_DAG_ID, + "logical_date": logical_date_formatted, + "key": TEST_XCOM_KEY, + "task_id": TEST_TASK_ID, + "timestamp": "TIMESTAMP", + "map_index": idx, + } + for idx in range(2) + ] + else: + expected_entries = [ + { + "dag_id": TEST_DAG_ID, + "logical_date": logical_date_formatted, + "key": TEST_XCOM_KEY, + "task_id": TEST_TASK_ID, + "timestamp": "TIMESTAMP", + "map_index": map_index, + } + ] + for xcom_entry in response_data["xcom_entries"]: + xcom_entry["timestamp"] = "TIMESTAMP" + assert response_data == { + "xcom_entries": expected_entries, + "total_entries": len(expected_entries), + } + + @pytest.mark.parametrize( + "key, expected_entries", + [ + ( + TEST_XCOM_KEY, + [ + { + "dag_id": TEST_DAG_ID, + "logical_date": logical_date_formatted, + "key": TEST_XCOM_KEY, + "task_id": TEST_TASK_ID, + "timestamp": "TIMESTAMP", + "map_index": 0, + }, + { + "dag_id": TEST_DAG_ID, + "logical_date": logical_date_formatted, + "key": TEST_XCOM_KEY, + "task_id": TEST_TASK_ID, + "timestamp": "TIMESTAMP", + "map_index": 1, + }, + ], + ), + (f"{TEST_XCOM_KEY}-0", []), + ], + ) + def test_should_respond_200_with_xcom_key(self, key, expected_entries, test_client): + self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed, TEST_TASK_ID, mapped_ti=True) + response = test_client.get( + "/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries", + params={"xcom_key": key} if key is not None else None, + ) + + assert response.status_code == 200 + response_data = response.json() + for xcom_entry in response_data["xcom_entries"]: + xcom_entry["timestamp"] = "TIMESTAMP" + assert response_data == { + "xcom_entries": expected_entries, + "total_entries": len(expected_entries), + } + + @provide_session + def _create_xcom_entries(self, dag_id, run_id, logical_date, task_id, mapped_ti=False, session=None): + dag = DagModel(dag_id=dag_id) + session.add(dag) + dagrun = DagRun( + dag_id=dag_id, + run_id=run_id, + logical_date=logical_date, + start_date=logical_date, + run_type=DagRunType.MANUAL, + ) + session.add(dagrun) + if mapped_ti: + for i in [0, 1]: + ti = TaskInstance(EmptyOperator(task_id=task_id), run_id=run_id, map_index=i) + ti.dag_id = dag_id + session.add(ti) + else: + ti = TaskInstance(EmptyOperator(task_id=task_id), run_id=run_id) + ti.dag_id = dag_id + session.add(ti) + session.commit() + + for i in [0, 1]: + if mapped_ti: + key = TEST_XCOM_KEY + map_index = i + else: + key = f"{TEST_XCOM_KEY}-{i}" + map_index = -1 + + XCom.set( + key=key, + value=TEST_XCOM_VALUE, + run_id=run_id, + task_id=task_id, + dag_id=dag_id, + map_index=map_index, + ) + + @pytest.fixture(autouse=True) + def setup(self) -> None: + self.clear_db() + + +class TestPaginationGetXComEntries(TestXComEndpoint): + @pytest.mark.parametrize( + "query_params, expected_xcom_ids", + [ + ( + {"limit": "1"}, + ["TEST_XCOM_KEY0"], + ), + ( + {"limit": "2"}, + ["TEST_XCOM_KEY0", "TEST_XCOM_KEY1"], + ), + ( + {"offset": "5"}, + [ + "TEST_XCOM_KEY5", + "TEST_XCOM_KEY6", + "TEST_XCOM_KEY7", + "TEST_XCOM_KEY8", + "TEST_XCOM_KEY9", + ], + ), + ( + {"offset": "0"}, + [ + "TEST_XCOM_KEY0", + "TEST_XCOM_KEY1", + "TEST_XCOM_KEY2", + "TEST_XCOM_KEY3", + "TEST_XCOM_KEY4", + "TEST_XCOM_KEY5", + "TEST_XCOM_KEY6", + "TEST_XCOM_KEY7", + "TEST_XCOM_KEY8", + "TEST_XCOM_KEY9", + ], + ), + ( + {"limit": "1", "offset": "5"}, + ["TEST_XCOM_KEY5"], + ), + ( + {"limit": "1", "offset": "1"}, + ["TEST_XCOM_KEY1"], + ), + ( + {"limit": "2", "offset": "2"}, + ["TEST_XCOM_KEY2", "TEST_XCOM_KEY3"], + ), + ], + ) + def test_handle_limit_offset(self, query_params, expected_xcom_ids, test_client): + for i in range(10): + self._create_xcom(f"TEST_XCOM_KEY{i}", TEST_XCOM_VALUE) + response = test_client.get( + "/public/dags/~/dagRuns/~/taskInstances/~/xcomEntries", params=query_params + ) + assert response.status_code == 200 + response_data = response.json() + assert response_data["total_entries"] == 10 + conn_ids = [conn["key"] for conn in response_data["xcom_entries"] if conn] + assert conn_ids == expected_xcom_ids From 34341031c1d2f0389a54d713aeaf119feaa4388c Mon Sep 17 00:00:00 2001 From: Michael Smith-Chandler Date: Tue, 26 Nov 2024 11:15:17 +0000 Subject: [PATCH 3/5] Address pr comments --- airflow/api_fastapi/core_api/routes/public/xcom.py | 7 ++----- tests/api_fastapi/core_api/routes/public/test_xcom.py | 8 ++++---- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/xcom.py b/airflow/api_fastapi/core_api/routes/public/xcom.py index 59ff2194e32ee..f0d4aca966847 100644 --- a/airflow/api_fastapi/core_api/routes/public/xcom.py +++ b/airflow/api_fastapi/core_api/routes/public/xcom.py @@ -28,7 +28,6 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.xcom import ( XComCollection, - XComResponse, XComResponseNative, XComResponseString, ) @@ -135,7 +134,7 @@ def get_xcom_entries( query = query.where(XCom.key == xcom_key) query, total_entries = paginated_select( - select=query, + statement=query, filters=[], order_by=SortParam(["dag_id", "task_id", "run_id", "map_index", "key"], XCom), offset=offset, @@ -143,6 +142,4 @@ def get_xcom_entries( session=session, ) xcoms = session.scalars(query) - return XComCollection( - xcom_entries=[XComResponse.model_validate(xcom) for xcom in xcoms], total_entries=total_entries - ) + return XComCollection(xcom_entries=xcoms, total_entries=total_entries) diff --git a/tests/api_fastapi/core_api/routes/public/test_xcom.py b/tests/api_fastapi/core_api/routes/public/test_xcom.py index 8697ee89fc01a..022987125fb7a 100644 --- a/tests/api_fastapi/core_api/routes/public/test_xcom.py +++ b/tests/api_fastapi/core_api/routes/public/test_xcom.py @@ -211,6 +211,10 @@ def test_custom_xcom_deserialize( class TestGetXComEntries(TestXComEndpoint): + @pytest.fixture(autouse=True) + def setup(self) -> None: + self.clear_db() + def test_should_respond_200(self, test_client): self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed, TEST_TASK_ID) response = test_client.get( @@ -417,10 +421,6 @@ def _create_xcom_entries(self, dag_id, run_id, logical_date, task_id, mapped_ti= map_index=map_index, ) - @pytest.fixture(autouse=True) - def setup(self) -> None: - self.clear_db() - class TestPaginationGetXComEntries(TestXComEndpoint): @pytest.mark.parametrize( From ebdbf31d89858adf7a34ee4f7b9ed6bd906b8a80 Mon Sep 17 00:00:00 2001 From: Michael Smith-Chandler Date: Tue, 26 Nov 2024 16:27:33 +0000 Subject: [PATCH 4/5] Fix order by --- airflow/api_fastapi/core_api/routes/public/xcom.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/xcom.py b/airflow/api_fastapi/core_api/routes/public/xcom.py index f0d4aca966847..1d4b154fd87c1 100644 --- a/airflow/api_fastapi/core_api/routes/public/xcom.py +++ b/airflow/api_fastapi/core_api/routes/public/xcom.py @@ -24,7 +24,7 @@ from sqlalchemy.orm import Session from airflow.api_fastapi.common.db.common import get_session, paginated_select -from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam +from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.xcom import ( XComCollection, @@ -135,11 +135,10 @@ def get_xcom_entries( query, total_entries = paginated_select( statement=query, - filters=[], - order_by=SortParam(["dag_id", "task_id", "run_id", "map_index", "key"], XCom), offset=offset, limit=limit, session=session, ) + query = query.order_by(XCom.dag_id, XCom.task_id, XCom.run_id, XCom.map_index, XCom.key) xcoms = session.scalars(query) return XComCollection(xcom_entries=xcoms, total_entries=total_entries) From f413add80d64049efceef97cdbe4518c7b616598 Mon Sep 17 00:00:00 2001 From: Michael Smith-Chandler Date: Tue, 26 Nov 2024 16:45:23 +0000 Subject: [PATCH 5/5] Update UI generated code --- airflow/ui/openapi-gen/queries/common.ts | 99 ++++++---- airflow/ui/openapi-gen/queries/prefetch.ts | 167 +++++++++++------ airflow/ui/openapi-gen/queries/queries.ts | 170 ++++++++++++------ airflow/ui/openapi-gen/queries/suspense.ts | 170 ++++++++++++------ .../ui/openapi-gen/requests/schemas.gen.ts | 62 +++++++ .../ui/openapi-gen/requests/services.gen.ts | 135 +++++++++----- airflow/ui/openapi-gen/requests/types.gen.ts | 149 ++++++++++----- 7 files changed, 664 insertions(+), 288 deletions(-) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 4696713d203c5..e4bc5f300d8a5 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1393,6 +1393,72 @@ export const UseProviderServiceGetProvidersKeyFn = ( } = {}, queryKey?: Array, ) => [useProviderServiceGetProvidersKey, ...(queryKey ?? [{ limit, offset }])]; +export type XcomServiceGetXcomEntryDefaultResponse = Awaited< + ReturnType +>; +export type XcomServiceGetXcomEntryQueryResult< + TData = XcomServiceGetXcomEntryDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useXcomServiceGetXcomEntryKey = "XcomServiceGetXcomEntry"; +export const UseXcomServiceGetXcomEntryKeyFn = ( + { + dagId, + dagRunId, + deserialize, + mapIndex, + stringify, + taskId, + xcomKey, + }: { + dagId: string; + dagRunId: string; + deserialize?: boolean; + mapIndex?: number; + stringify?: boolean; + taskId: string; + xcomKey: string; + }, + queryKey?: Array, +) => [ + useXcomServiceGetXcomEntryKey, + ...(queryKey ?? [ + { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey }, + ]), +]; +export type XcomServiceGetXcomEntriesDefaultResponse = Awaited< + ReturnType +>; +export type XcomServiceGetXcomEntriesQueryResult< + TData = XcomServiceGetXcomEntriesDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useXcomServiceGetXcomEntriesKey = "XcomServiceGetXcomEntries"; +export const UseXcomServiceGetXcomEntriesKeyFn = ( + { + dagId, + dagRunId, + limit, + mapIndex, + offset, + taskId, + xcomKey, + }: { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number; + offset?: number; + taskId: string; + xcomKey?: string; + }, + queryKey?: Array, +) => [ + useXcomServiceGetXcomEntriesKey, + ...(queryKey ?? [ + { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }, + ]), +]; export type TaskServiceGetTasksDefaultResponse = Awaited< ReturnType >; @@ -1468,39 +1534,6 @@ export const UseVariableServiceGetVariablesKeyFn = ( useVariableServiceGetVariablesKey, ...(queryKey ?? [{ limit, offset, orderBy }]), ]; -export type XcomServiceGetXcomEntryDefaultResponse = Awaited< - ReturnType ->; -export type XcomServiceGetXcomEntryQueryResult< - TData = XcomServiceGetXcomEntryDefaultResponse, - TError = unknown, -> = UseQueryResult; -export const useXcomServiceGetXcomEntryKey = "XcomServiceGetXcomEntry"; -export const UseXcomServiceGetXcomEntryKeyFn = ( - { - dagId, - dagRunId, - deserialize, - mapIndex, - stringify, - taskId, - xcomKey, - }: { - dagId: string; - dagRunId: string; - deserialize?: boolean; - mapIndex?: number; - stringify?: boolean; - taskId: string; - xcomKey: string; - }, - queryKey?: Array, -) => [ - useXcomServiceGetXcomEntryKey, - ...(queryKey ?? [ - { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey }, - ]), -]; export type MonitorServiceGetHealthDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index ff83018fc89d7..8f423b4c083de 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1891,6 +1891,118 @@ export const prefetchUseProviderServiceGetProviders = ( queryKey: Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }), queryFn: () => ProviderService.getProviders({ limit, offset }), }); +/** + * Get Xcom Entry + * Get an XCom entry. + * @param data The data for the request. + * @param data.dagId + * @param data.taskId + * @param data.dagRunId + * @param data.xcomKey + * @param data.mapIndex + * @param data.deserialize + * @param data.stringify + * @returns unknown Successful Response + * @throws ApiError + */ +export const prefetchUseXcomServiceGetXcomEntry = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + deserialize, + mapIndex, + stringify, + taskId, + xcomKey, + }: { + dagId: string; + dagRunId: string; + deserialize?: boolean; + mapIndex?: number; + stringify?: boolean; + taskId: string; + xcomKey: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseXcomServiceGetXcomEntryKeyFn({ + dagId, + dagRunId, + deserialize, + mapIndex, + stringify, + taskId, + xcomKey, + }), + queryFn: () => + XcomService.getXcomEntry({ + dagId, + dagRunId, + deserialize, + mapIndex, + stringify, + taskId, + xcomKey, + }), + }); +/** + * Get Xcom Entries + * Get all XCom entries. + * + * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to retrieve XCom entries for all DAGs. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.xcomKey + * @param data.mapIndex + * @param data.limit + * @param data.offset + * @returns XComCollection Successful Response + * @throws ApiError + */ +export const prefetchUseXcomServiceGetXcomEntries = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + limit, + mapIndex, + offset, + taskId, + xcomKey, + }: { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number; + offset?: number; + taskId: string; + xcomKey?: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ + dagId, + dagRunId, + limit, + mapIndex, + offset, + taskId, + xcomKey, + }), + queryFn: () => + XcomService.getXcomEntries({ + dagId, + dagRunId, + limit, + mapIndex, + offset, + taskId, + xcomKey, + }), + }); /** * Get Tasks * Get tasks for DAG. @@ -1987,61 +2099,6 @@ export const prefetchUseVariableServiceGetVariables = ( }), queryFn: () => VariableService.getVariables({ limit, offset, orderBy }), }); -/** - * Get Xcom Entry - * Get an XCom entry. - * @param data The data for the request. - * @param data.dagId - * @param data.taskId - * @param data.dagRunId - * @param data.xcomKey - * @param data.mapIndex - * @param data.deserialize - * @param data.stringify - * @returns unknown Successful Response - * @throws ApiError - */ -export const prefetchUseXcomServiceGetXcomEntry = ( - queryClient: QueryClient, - { - dagId, - dagRunId, - deserialize, - mapIndex, - stringify, - taskId, - xcomKey, - }: { - dagId: string; - dagRunId: string; - deserialize?: boolean; - mapIndex?: number; - stringify?: boolean; - taskId: string; - xcomKey: string; - }, -) => - queryClient.prefetchQuery({ - queryKey: Common.UseXcomServiceGetXcomEntryKeyFn({ - dagId, - dagRunId, - deserialize, - mapIndex, - stringify, - taskId, - xcomKey, - }), - queryFn: () => - XcomService.getXcomEntry({ - dagId, - dagRunId, - deserialize, - mapIndex, - stringify, - taskId, - xcomKey, - }), - }); /** * Get Health * @returns HealthInfoSchema Successful Response diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index ccc8bc1a12dd4..6ff3e83ccced2 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2247,6 +2247,120 @@ export const useProviderServiceGetProviders = < queryFn: () => ProviderService.getProviders({ limit, offset }) as TData, ...options, }); +/** + * Get Xcom Entry + * Get an XCom entry. + * @param data The data for the request. + * @param data.dagId + * @param data.taskId + * @param data.dagRunId + * @param data.xcomKey + * @param data.mapIndex + * @param data.deserialize + * @param data.stringify + * @returns unknown Successful Response + * @throws ApiError + */ +export const useXcomServiceGetXcomEntry = < + TData = Common.XcomServiceGetXcomEntryDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + deserialize, + mapIndex, + stringify, + taskId, + xcomKey, + }: { + dagId: string; + dagRunId: string; + deserialize?: boolean; + mapIndex?: number; + stringify?: boolean; + taskId: string; + xcomKey: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseXcomServiceGetXcomEntryKeyFn( + { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey }, + queryKey, + ), + queryFn: () => + XcomService.getXcomEntry({ + dagId, + dagRunId, + deserialize, + mapIndex, + stringify, + taskId, + xcomKey, + }) as TData, + ...options, + }); +/** + * Get Xcom Entries + * Get all XCom entries. + * + * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to retrieve XCom entries for all DAGs. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.xcomKey + * @param data.mapIndex + * @param data.limit + * @param data.offset + * @returns XComCollection Successful Response + * @throws ApiError + */ +export const useXcomServiceGetXcomEntries = < + TData = Common.XcomServiceGetXcomEntriesDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + limit, + mapIndex, + offset, + taskId, + xcomKey, + }: { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number; + offset?: number; + taskId: string; + xcomKey?: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn( + { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }, + queryKey, + ), + queryFn: () => + XcomService.getXcomEntries({ + dagId, + dagRunId, + limit, + mapIndex, + offset, + taskId, + xcomKey, + }) as TData, + ...options, + }); /** * Get Tasks * Get tasks for DAG. @@ -2370,62 +2484,6 @@ export const useVariableServiceGetVariables = < VariableService.getVariables({ limit, offset, orderBy }) as TData, ...options, }); -/** - * Get Xcom Entry - * Get an XCom entry. - * @param data The data for the request. - * @param data.dagId - * @param data.taskId - * @param data.dagRunId - * @param data.xcomKey - * @param data.mapIndex - * @param data.deserialize - * @param data.stringify - * @returns unknown Successful Response - * @throws ApiError - */ -export const useXcomServiceGetXcomEntry = < - TData = Common.XcomServiceGetXcomEntryDefaultResponse, - TError = unknown, - TQueryKey extends Array = unknown[], ->( - { - dagId, - dagRunId, - deserialize, - mapIndex, - stringify, - taskId, - xcomKey, - }: { - dagId: string; - dagRunId: string; - deserialize?: boolean; - mapIndex?: number; - stringify?: boolean; - taskId: string; - xcomKey: string; - }, - queryKey?: TQueryKey, - options?: Omit, "queryKey" | "queryFn">, -) => - useQuery({ - queryKey: Common.UseXcomServiceGetXcomEntryKeyFn( - { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey }, - queryKey, - ), - queryFn: () => - XcomService.getXcomEntry({ - dagId, - dagRunId, - deserialize, - mapIndex, - stringify, - taskId, - xcomKey, - }) as TData, - ...options, - }); /** * Get Health * @returns HealthInfoSchema Successful Response diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 49125fef08de4..11386ab5d1fbc 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -2225,6 +2225,120 @@ export const useProviderServiceGetProvidersSuspense = < queryFn: () => ProviderService.getProviders({ limit, offset }) as TData, ...options, }); +/** + * Get Xcom Entry + * Get an XCom entry. + * @param data The data for the request. + * @param data.dagId + * @param data.taskId + * @param data.dagRunId + * @param data.xcomKey + * @param data.mapIndex + * @param data.deserialize + * @param data.stringify + * @returns unknown Successful Response + * @throws ApiError + */ +export const useXcomServiceGetXcomEntrySuspense = < + TData = Common.XcomServiceGetXcomEntryDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + deserialize, + mapIndex, + stringify, + taskId, + xcomKey, + }: { + dagId: string; + dagRunId: string; + deserialize?: boolean; + mapIndex?: number; + stringify?: boolean; + taskId: string; + xcomKey: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseXcomServiceGetXcomEntryKeyFn( + { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey }, + queryKey, + ), + queryFn: () => + XcomService.getXcomEntry({ + dagId, + dagRunId, + deserialize, + mapIndex, + stringify, + taskId, + xcomKey, + }) as TData, + ...options, + }); +/** + * Get Xcom Entries + * Get all XCom entries. + * + * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to retrieve XCom entries for all DAGs. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.xcomKey + * @param data.mapIndex + * @param data.limit + * @param data.offset + * @returns XComCollection Successful Response + * @throws ApiError + */ +export const useXcomServiceGetXcomEntriesSuspense = < + TData = Common.XcomServiceGetXcomEntriesDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + limit, + mapIndex, + offset, + taskId, + xcomKey, + }: { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number; + offset?: number; + taskId: string; + xcomKey?: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn( + { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }, + queryKey, + ), + queryFn: () => + XcomService.getXcomEntries({ + dagId, + dagRunId, + limit, + mapIndex, + offset, + taskId, + xcomKey, + }) as TData, + ...options, + }); /** * Get Tasks * Get tasks for DAG. @@ -2348,62 +2462,6 @@ export const useVariableServiceGetVariablesSuspense = < VariableService.getVariables({ limit, offset, orderBy }) as TData, ...options, }); -/** - * Get Xcom Entry - * Get an XCom entry. - * @param data The data for the request. - * @param data.dagId - * @param data.taskId - * @param data.dagRunId - * @param data.xcomKey - * @param data.mapIndex - * @param data.deserialize - * @param data.stringify - * @returns unknown Successful Response - * @throws ApiError - */ -export const useXcomServiceGetXcomEntrySuspense = < - TData = Common.XcomServiceGetXcomEntryDefaultResponse, - TError = unknown, - TQueryKey extends Array = unknown[], ->( - { - dagId, - dagRunId, - deserialize, - mapIndex, - stringify, - taskId, - xcomKey, - }: { - dagId: string; - dagRunId: string; - deserialize?: boolean; - mapIndex?: number; - stringify?: boolean; - taskId: string; - xcomKey: string; - }, - queryKey?: TQueryKey, - options?: Omit, "queryKey" | "queryFn">, -) => - useSuspenseQuery({ - queryKey: Common.UseXcomServiceGetXcomEntryKeyFn( - { dagId, dagRunId, deserialize, mapIndex, stringify, taskId, xcomKey }, - queryKey, - ), - queryFn: () => - XcomService.getXcomEntry({ - dagId, - dagRunId, - deserialize, - mapIndex, - stringify, - taskId, - xcomKey, - }) as TData, - ...options, - }); /** * Get Health * @returns HealthInfoSchema Successful Response diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 767b86fc82a4a..8002b9d37f6c0 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -5064,6 +5064,68 @@ export const $VersionInfo = { description: "Version information serializer for responses.", } as const; +export const $XComCollection = { + properties: { + xcom_entries: { + items: { + $ref: "#/components/schemas/XComResponse", + }, + type: "array", + title: "Xcom Entries", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["xcom_entries", "total_entries"], + title: "XComCollection", + description: "List of XCom items.", +} as const; + +export const $XComResponse = { + properties: { + key: { + type: "string", + title: "Key", + }, + timestamp: { + type: "string", + format: "date-time", + title: "Timestamp", + }, + logical_date: { + type: "string", + format: "date-time", + title: "Logical Date", + }, + map_index: { + type: "integer", + title: "Map Index", + }, + task_id: { + type: "string", + title: "Task Id", + }, + dag_id: { + type: "string", + title: "Dag Id", + }, + }, + type: "object", + required: [ + "key", + "timestamp", + "logical_date", + "map_index", + "task_id", + "dag_id", + ], + title: "XComResponse", + description: "Serializer for a xcom item.", +} as const; + export const $XComResponseNative = { properties: { key: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 1f14639496d0c..d8cb33bceec9f 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -150,6 +150,10 @@ import type { PostPoolsResponse, GetProvidersData, GetProvidersResponse, + GetXcomEntryData, + GetXcomEntryResponse, + GetXcomEntriesData, + GetXcomEntriesResponse, GetTasksData, GetTasksResponse, GetTaskData, @@ -164,8 +168,6 @@ import type { GetVariablesResponse, PostVariableData, PostVariableResponse, - GetXcomEntryData, - GetXcomEntryResponse, GetHealthResponse, GetVersionResponse, } from "./types.gen"; @@ -2608,6 +2610,92 @@ export class ProviderService { } } +export class XcomService { + /** + * Get Xcom Entry + * Get an XCom entry. + * @param data The data for the request. + * @param data.dagId + * @param data.taskId + * @param data.dagRunId + * @param data.xcomKey + * @param data.mapIndex + * @param data.deserialize + * @param data.stringify + * @returns unknown Successful Response + * @throws ApiError + */ + public static getXcomEntry( + data: GetXcomEntryData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}", + path: { + dag_id: data.dagId, + task_id: data.taskId, + dag_run_id: data.dagRunId, + xcom_key: data.xcomKey, + }, + query: { + map_index: data.mapIndex, + deserialize: data.deserialize, + stringify: data.stringify, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + + /** + * Get Xcom Entries + * Get all XCom entries. + * + * This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to retrieve XCom entries for all DAGs. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.xcomKey + * @param data.mapIndex + * @param data.limit + * @param data.offset + * @returns XComCollection Successful Response + * @throws ApiError + */ + public static getXcomEntries( + data: GetXcomEntriesData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + }, + query: { + xcom_key: data.xcomKey, + map_index: data.mapIndex, + limit: data.limit, + offset: data.offset, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } +} + export class TaskService { /** * Get Tasks @@ -2809,49 +2897,6 @@ export class VariableService { } } -export class XcomService { - /** - * Get Xcom Entry - * Get an XCom entry. - * @param data The data for the request. - * @param data.dagId - * @param data.taskId - * @param data.dagRunId - * @param data.xcomKey - * @param data.mapIndex - * @param data.deserialize - * @param data.stringify - * @returns unknown Successful Response - * @throws ApiError - */ - public static getXcomEntry( - data: GetXcomEntryData, - ): CancelablePromise { - return __request(OpenAPI, { - method: "GET", - url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}", - path: { - dag_id: data.dagId, - task_id: data.taskId, - dag_run_id: data.dagRunId, - xcom_key: data.xcomKey, - }, - query: { - map_index: data.mapIndex, - deserialize: data.deserialize, - stringify: data.stringify, - }, - errors: { - 400: "Bad Request", - 401: "Unauthorized", - 403: "Forbidden", - 404: "Not Found", - 422: "Validation Error", - }, - }); - } -} - export class MonitorService { /** * Get Health diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index f130c187c711a..bdcce0157dce2 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1185,6 +1185,26 @@ export type VersionInfo = { git_version: string | null; }; +/** + * List of XCom items. + */ +export type XComCollection = { + xcom_entries: Array; + total_entries: number; +}; + +/** + * Serializer for a xcom item. + */ +export type XComResponse = { + key: string; + timestamp: string; + logical_date: string; + map_index: number; + task_id: string; + dag_id: string; +}; + /** * XCom response serializer with native return type. */ @@ -1866,6 +1886,30 @@ export type GetProvidersData = { export type GetProvidersResponse = ProviderCollectionResponse; +export type GetXcomEntryData = { + dagId: string; + dagRunId: string; + deserialize?: boolean; + mapIndex?: number; + stringify?: boolean; + taskId: string; + xcomKey: string; +}; + +export type GetXcomEntryResponse = XComResponseNative | XComResponseString; + +export type GetXcomEntriesData = { + dagId: string; + dagRunId: string; + limit?: number; + mapIndex?: number | null; + offset?: number; + taskId: string; + xcomKey?: string | null; +}; + +export type GetXcomEntriesResponse = XComCollection; + export type GetTasksData = { dagId: string; orderBy?: string; @@ -1914,18 +1958,6 @@ export type PostVariableData = { export type PostVariableResponse = VariableResponse; -export type GetXcomEntryData = { - dagId: string; - dagRunId: string; - deserialize?: boolean; - mapIndex?: number; - stringify?: boolean; - taskId: string; - xcomKey: string; -}; - -export type GetXcomEntryResponse = XComResponseNative | XComResponseString; - export type GetHealthResponse = HealthInfoSchema; export type GetVersionResponse = VersionInfo; @@ -3906,6 +3938,68 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}": { + get: { + req: GetXcomEntryData; + res: { + /** + * Successful Response + */ + 200: XComResponseNative | XComResponseString; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries": { + get: { + req: GetXcomEntriesData; + res: { + /** + * Successful Response + */ + 200: XComCollection; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dags/{dag_id}/tasks": { get: { req: GetTasksData; @@ -4093,37 +4187,6 @@ export type $OpenApiTs = { }; }; }; - "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}": { - get: { - req: GetXcomEntryData; - res: { - /** - * Successful Response - */ - 200: XComResponseNative | XComResponseString; - /** - * Bad Request - */ - 400: HTTPExceptionResponse; - /** - * Unauthorized - */ - 401: HTTPExceptionResponse; - /** - * Forbidden - */ - 403: HTTPExceptionResponse; - /** - * Not Found - */ - 404: HTTPExceptionResponse; - /** - * Validation Error - */ - 422: HTTPValidationError; - }; - }; - }; "/public/monitor/health": { get: { res: {