From 3dbc149d1c2db7bcb6980c823c57f40e4a45a2ca Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 15 Jul 2025 15:16:26 +0200 Subject: [PATCH 1/4] feat(hitl): include task_instance detail in hitl detail response --- .../api_fastapi/core_api/datamodels/hitl.py | 3 +- .../core_api/routes/public/hitl.py | 7 ++- airflow-core/src/airflow/models/hitl.py | 6 +++ .../src/airflow/models/taskinstance.py | 2 + .../core_api/routes/public/test_hitl.py | 47 ++++++++++++++++++- 5 files changed, 62 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py index 88ad702316423..7b974bc15fba0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py @@ -23,6 +23,7 @@ from pydantic import Field, field_validator from airflow.api_fastapi.core_api.base import BaseModel +from airflow.api_fastapi.core_api.datamodels.task_instances import TaskInstanceResponse from airflow.sdk import Param @@ -45,7 +46,7 @@ class HITLDetailResponse(BaseModel): class HITLDetail(BaseModel): """Schema for Human-in-the-loop detail.""" - ti_id: str + task_instance: TaskInstanceResponse # User Request Detail options: list[str] diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py index 78c7604b51677..1d4d20fe64ea8 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py @@ -19,6 +19,7 @@ import structlog from fastapi import Depends, HTTPException, status from sqlalchemy import select +from sqlalchemy.orm import joinedload from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity from airflow.api_fastapi.common.db.common import SessionDep, paginated_select @@ -132,7 +133,11 @@ def _get_hitl_detail( ) ti_id_str = str(task_instance.id) - hitl_detail_model = session.scalar(select(HITLDetailModel).where(HITLDetailModel.ti_id == ti_id_str)) + hitl_detail_model = session.scalar( + select(HITLDetailModel) + .where(HITLDetailModel.ti_id == ti_id_str) + .options(joinedload(HITLDetailModel.task_instance)) + ) if not hitl_detail_model: log.error("Human-in-the-loop detail not found") raise HTTPException( diff --git a/airflow-core/src/airflow/models/hitl.py b/airflow-core/src/airflow/models/hitl.py index 9d060ba1c19d7..582edd3a6e381 100644 --- a/airflow-core/src/airflow/models/hitl.py +++ b/airflow-core/src/airflow/models/hitl.py @@ -20,6 +20,7 @@ from sqlalchemy import Boolean, Column, ForeignKeyConstraint, String, Text from sqlalchemy.dialects import postgresql from sqlalchemy.ext.hybrid import hybrid_property +from sqlalchemy.orm import relationship from airflow.models.base import Base from airflow.settings import json @@ -53,6 +54,11 @@ class HITLDetail(Base): default=None, ) params_input = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) + task_instance = relationship( + "TaskInstance", + lazy="joined", + back_populates="hitl_detail", + ) __table_args__ = ( ForeignKeyConstraint( diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index a2af30a24d4ec..40ad420d43047 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -555,6 +555,8 @@ class TaskInstance(Base, LoggingMixin): triggerer_job = association_proxy("trigger", "triggerer_job") dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True) rendered_task_instance_fields = relationship("RenderedTaskInstanceFields", lazy="noload", uselist=False) + hitl_detail = relationship("HITLDetail", lazy="noload", uselist=False) + run_after = association_proxy("dag_run", "run_after") logical_date = association_proxy("dag_run", "logical_date") task_instance_note = relationship( diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py index 3fa34a5779a37..e851bca4b25fa 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from unittest import mock + import pytest from sqlalchemy.orm import Session @@ -111,8 +113,51 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]: "chosen_options": None, "response_received": False, "subject": "This is subject", - "ti_id": sample_ti.id, "user_id": None, + "task_instance": { + "dag_display_name": "dag", + "dag_id": "dag", + "dag_run_id": "test", + "dag_version": { + "bundle_name": "dag_maker", + "bundle_url": None, + "bundle_version": None, + "created_at": mock.ANY, + "dag_display_name": "dag", + "dag_id": "dag", + "id": mock.ANY, + "version_number": 1, + }, + "duration": None, + "end_date": None, + "executor": None, + "executor_config": "{}", + "hostname": "", + "id": sample_ti.id, + "logical_date": mock.ANY, + "map_index": -1, + "max_tries": 0, + "note": None, + "operator": "EmptyOperator", + "pid": None, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 1, + "queue": "default", + "queued_when": None, + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": mock.ANY, + "scheduled_when": None, + "start_date": None, + "state": None, + "task_display_name": "op1", + "task_id": "op1", + "trigger": None, + "triggerer_job": None, + "try_number": 0, + "unixname": "root", + }, } From d79bbbfcdf337087239fbb9c22cfb6d5a39d0287 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 16 Jul 2025 11:01:49 +0200 Subject: [PATCH 2/4] fixup! feat(hitl): include task_instance detail in hitl detail response --- scripts/ci/pre_commit/check_ti_vs_tis_attributes.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py b/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py index 401bbbc076e2e..d29c11235a1b5 100755 --- a/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py +++ b/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py @@ -56,6 +56,7 @@ def compare_attributes(path1, path2): "triggerer_job", "note", "rendered_task_instance_fields", + "hitl_detail", # Storing last heartbeat for historic TIs is not interesting/useful "last_heartbeat_at", "id", From 08afcf6612c74712012909fb8f457d539900af17 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 16 Jul 2025 11:09:34 +0200 Subject: [PATCH 3/4] fixup! fixup! feat(hitl): include task_instance detail in hitl detail response --- .../openapi/v2-rest-api-generated.yaml | 7 +-- .../airflowctl/api/datamodels/generated.py | 56 +++++++++---------- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index ae9a645fc2e0f..f2fb07734e9e4 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -9891,9 +9891,8 @@ components: description: Serializer for Plugin FastAPI root middleware responses. HITLDetail: properties: - ti_id: - type: string - title: Ti Id + task_instance: + $ref: '#/components/schemas/TaskInstanceResponse' options: items: type: string @@ -9950,7 +9949,7 @@ components: default: false type: object required: - - ti_id + - task_instance - options - subject title: HITLDetail diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 0824759850a68..1fecf1b6b3a4e 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -573,34 +573,6 @@ class FastAPIRootMiddlewareResponse(BaseModel): name: Annotated[str, Field(title="Name")] -class HITLDetail(BaseModel): - """ - Schema for Human-in-the-loop detail. - """ - - ti_id: Annotated[str, Field(title="Ti Id")] - options: Annotated[list[str], Field(title="Options")] - subject: Annotated[str, Field(title="Subject")] - body: Annotated[str | None, Field(title="Body")] = None - defaults: Annotated[list[str] | None, Field(title="Defaults")] = None - multiple: Annotated[bool | None, Field(title="Multiple")] = False - params: Annotated[dict[str, Any] | None, Field(title="Params")] = None - user_id: Annotated[str | None, Field(title="User Id")] = None - response_at: Annotated[datetime | None, Field(title="Response At")] = None - chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None - params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None - response_received: Annotated[bool | None, Field(title="Response Received")] = False - - -class HITLDetailCollection(BaseModel): - """ - Schema for a collection of Human-in-the-loop details. - """ - - hitl_details: Annotated[list[HITLDetail], Field(title="Hitl Details")] - total_entries: Annotated[int, Field(title="Total Entries")] - - class HITLDetailResponse(BaseModel): """ Response of updating a Human-in-the-loop detail. @@ -1829,6 +1801,34 @@ class DagStatsCollectionResponse(BaseModel): total_entries: Annotated[int, Field(title="Total Entries")] +class HITLDetail(BaseModel): + """ + Schema for Human-in-the-loop detail. + """ + + task_instance: TaskInstanceResponse + options: Annotated[list[str], Field(title="Options")] + subject: Annotated[str, Field(title="Subject")] + body: Annotated[str | None, Field(title="Body")] = None + defaults: Annotated[list[str] | None, Field(title="Defaults")] = None + multiple: Annotated[bool | None, Field(title="Multiple")] = False + params: Annotated[dict[str, Any] | None, Field(title="Params")] = None + user_id: Annotated[str | None, Field(title="User Id")] = None + response_at: Annotated[datetime | None, Field(title="Response At")] = None + chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None + params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None + response_received: Annotated[bool | None, Field(title="Response Received")] = False + + +class HITLDetailCollection(BaseModel): + """ + Schema for a collection of Human-in-the-loop details. + """ + + hitl_details: Annotated[list[HITLDetail], Field(title="Hitl Details")] + total_entries: Annotated[int, Field(title="Total Entries")] + + class PluginCollectionResponse(BaseModel): """ Plugin Collection serializer. From b31186f4041239ffdfac48e72e60a795281b71c4 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 16 Jul 2025 11:21:02 +0200 Subject: [PATCH 4/4] fixup! fixup! fixup! feat(hitl): include task_instance detail in hitl detail response --- .../src/airflow/ui/openapi-gen/requests/schemas.gen.ts | 7 +++---- .../src/airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 2e31d61df5ae1..a26e0d4960182 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3410,9 +3410,8 @@ export const $FastAPIRootMiddlewareResponse = { export const $HITLDetail = { properties: { - ti_id: { - type: 'string', - title: 'Ti Id' + task_instance: { + '$ref': '#/components/schemas/TaskInstanceResponse' }, options: { items: { @@ -3509,7 +3508,7 @@ export const $HITLDetail = { } }, type: 'object', - required: ['ti_id', 'options', 'subject'], + required: ['task_instance', 'options', 'subject'], title: 'HITLDetail', description: 'Schema for Human-in-the-loop detail.' } as const; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 591ce7884373c..17e7de16780f7 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -921,7 +921,7 @@ export type FastAPIRootMiddlewareResponse = { * Schema for Human-in-the-loop detail. */ export type HITLDetail = { - ti_id: string; + task_instance: TaskInstanceResponse; options: Array<(string)>; subject: string; body?: string | null;