diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py index 88ad702316423..7b974bc15fba0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py @@ -23,6 +23,7 @@ from pydantic import Field, field_validator from airflow.api_fastapi.core_api.base import BaseModel +from airflow.api_fastapi.core_api.datamodels.task_instances import TaskInstanceResponse from airflow.sdk import Param @@ -45,7 +46,7 @@ class HITLDetailResponse(BaseModel): class HITLDetail(BaseModel): """Schema for Human-in-the-loop detail.""" - ti_id: str + task_instance: TaskInstanceResponse # User Request Detail options: list[str] diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index ae9a645fc2e0f..f2fb07734e9e4 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -9891,9 +9891,8 @@ components: description: Serializer for Plugin FastAPI root middleware responses. HITLDetail: properties: - ti_id: - type: string - title: Ti Id + task_instance: + $ref: '#/components/schemas/TaskInstanceResponse' options: items: type: string @@ -9950,7 +9949,7 @@ components: default: false type: object required: - - ti_id + - task_instance - options - subject title: HITLDetail diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py index 78c7604b51677..1d4d20fe64ea8 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py @@ -19,6 +19,7 @@ import structlog from fastapi import Depends, HTTPException, status from sqlalchemy import select +from sqlalchemy.orm import joinedload from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity from airflow.api_fastapi.common.db.common import SessionDep, paginated_select @@ -132,7 +133,11 @@ def _get_hitl_detail( ) ti_id_str = str(task_instance.id) - hitl_detail_model = session.scalar(select(HITLDetailModel).where(HITLDetailModel.ti_id == ti_id_str)) + hitl_detail_model = session.scalar( + select(HITLDetailModel) + .where(HITLDetailModel.ti_id == ti_id_str) + .options(joinedload(HITLDetailModel.task_instance)) + ) if not hitl_detail_model: log.error("Human-in-the-loop detail not found") raise HTTPException( diff --git a/airflow-core/src/airflow/models/hitl.py b/airflow-core/src/airflow/models/hitl.py index 9d060ba1c19d7..582edd3a6e381 100644 --- a/airflow-core/src/airflow/models/hitl.py +++ b/airflow-core/src/airflow/models/hitl.py @@ -20,6 +20,7 @@ from sqlalchemy import Boolean, Column, ForeignKeyConstraint, String, Text from sqlalchemy.dialects import postgresql from sqlalchemy.ext.hybrid import hybrid_property +from sqlalchemy.orm import relationship from airflow.models.base import Base from airflow.settings import json @@ -53,6 +54,11 @@ class HITLDetail(Base): default=None, ) params_input = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) + task_instance = relationship( + "TaskInstance", + lazy="joined", + back_populates="hitl_detail", + ) __table_args__ = ( ForeignKeyConstraint( diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index a2af30a24d4ec..40ad420d43047 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -555,6 +555,8 @@ class TaskInstance(Base, LoggingMixin): triggerer_job = association_proxy("trigger", "triggerer_job") dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True) rendered_task_instance_fields = relationship("RenderedTaskInstanceFields", lazy="noload", uselist=False) + hitl_detail = relationship("HITLDetail", lazy="noload", uselist=False) + run_after = association_proxy("dag_run", "run_after") logical_date = association_proxy("dag_run", "logical_date") task_instance_note = relationship( diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 2e31d61df5ae1..a26e0d4960182 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3410,9 +3410,8 @@ export const $FastAPIRootMiddlewareResponse = { export const $HITLDetail = { properties: { - ti_id: { - type: 'string', - title: 'Ti Id' + task_instance: { + '$ref': '#/components/schemas/TaskInstanceResponse' }, options: { items: { @@ -3509,7 +3508,7 @@ export const $HITLDetail = { } }, type: 'object', - required: ['ti_id', 'options', 'subject'], + required: ['task_instance', 'options', 'subject'], title: 'HITLDetail', description: 'Schema for Human-in-the-loop detail.' } as const; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 591ce7884373c..17e7de16780f7 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -921,7 +921,7 @@ export type FastAPIRootMiddlewareResponse = { * Schema for Human-in-the-loop detail. */ export type HITLDetail = { - ti_id: string; + task_instance: TaskInstanceResponse; options: Array<(string)>; subject: string; body?: string | null; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py index 3fa34a5779a37..e851bca4b25fa 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from unittest import mock + import pytest from sqlalchemy.orm import Session @@ -111,8 +113,51 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]: "chosen_options": None, "response_received": False, "subject": "This is subject", - "ti_id": sample_ti.id, "user_id": None, + "task_instance": { + "dag_display_name": "dag", + "dag_id": "dag", + "dag_run_id": "test", + "dag_version": { + "bundle_name": "dag_maker", + "bundle_url": None, + "bundle_version": None, + "created_at": mock.ANY, + "dag_display_name": "dag", + "dag_id": "dag", + "id": mock.ANY, + "version_number": 1, + }, + "duration": None, + "end_date": None, + "executor": None, + "executor_config": "{}", + "hostname": "", + "id": sample_ti.id, + "logical_date": mock.ANY, + "map_index": -1, + "max_tries": 0, + "note": None, + "operator": "EmptyOperator", + "pid": None, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 1, + "queue": "default", + "queued_when": None, + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": mock.ANY, + "scheduled_when": None, + "start_date": None, + "state": None, + "task_display_name": "op1", + "task_id": "op1", + "trigger": None, + "triggerer_job": None, + "try_number": 0, + "unixname": "root", + }, } diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 0824759850a68..1fecf1b6b3a4e 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -573,34 +573,6 @@ class FastAPIRootMiddlewareResponse(BaseModel): name: Annotated[str, Field(title="Name")] -class HITLDetail(BaseModel): - """ - Schema for Human-in-the-loop detail. - """ - - ti_id: Annotated[str, Field(title="Ti Id")] - options: Annotated[list[str], Field(title="Options")] - subject: Annotated[str, Field(title="Subject")] - body: Annotated[str | None, Field(title="Body")] = None - defaults: Annotated[list[str] | None, Field(title="Defaults")] = None - multiple: Annotated[bool | None, Field(title="Multiple")] = False - params: Annotated[dict[str, Any] | None, Field(title="Params")] = None - user_id: Annotated[str | None, Field(title="User Id")] = None - response_at: Annotated[datetime | None, Field(title="Response At")] = None - chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None - params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None - response_received: Annotated[bool | None, Field(title="Response Received")] = False - - -class HITLDetailCollection(BaseModel): - """ - Schema for a collection of Human-in-the-loop details. - """ - - hitl_details: Annotated[list[HITLDetail], Field(title="Hitl Details")] - total_entries: Annotated[int, Field(title="Total Entries")] - - class HITLDetailResponse(BaseModel): """ Response of updating a Human-in-the-loop detail. @@ -1829,6 +1801,34 @@ class DagStatsCollectionResponse(BaseModel): total_entries: Annotated[int, Field(title="Total Entries")] +class HITLDetail(BaseModel): + """ + Schema for Human-in-the-loop detail. + """ + + task_instance: TaskInstanceResponse + options: Annotated[list[str], Field(title="Options")] + subject: Annotated[str, Field(title="Subject")] + body: Annotated[str | None, Field(title="Body")] = None + defaults: Annotated[list[str] | None, Field(title="Defaults")] = None + multiple: Annotated[bool | None, Field(title="Multiple")] = False + params: Annotated[dict[str, Any] | None, Field(title="Params")] = None + user_id: Annotated[str | None, Field(title="User Id")] = None + response_at: Annotated[datetime | None, Field(title="Response At")] = None + chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None + params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None + response_received: Annotated[bool | None, Field(title="Response Received")] = False + + +class HITLDetailCollection(BaseModel): + """ + Schema for a collection of Human-in-the-loop details. + """ + + hitl_details: Annotated[list[HITLDetail], Field(title="Hitl Details")] + total_entries: Annotated[int, Field(title="Total Entries")] + + class PluginCollectionResponse(BaseModel): """ Plugin Collection serializer. diff --git a/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py b/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py index 401bbbc076e2e..d29c11235a1b5 100755 --- a/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py +++ b/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py @@ -56,6 +56,7 @@ def compare_attributes(path1, path2): "triggerer_job", "note", "rendered_task_instance_fields", + "hitl_detail", # Storing last heartbeat for historic TIs is not interesting/useful "last_heartbeat_at", "id",