Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pydantic import Field, field_validator

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.task_instances import TaskInstanceResponse
from airflow.sdk import Param


Expand All @@ -45,7 +46,7 @@ class HITLDetailResponse(BaseModel):
class HITLDetail(BaseModel):
"""Schema for Human-in-the-loop detail."""

ti_id: str
task_instance: TaskInstanceResponse

# User Request Detail
options: list[str]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9891,9 +9891,8 @@ components:
description: Serializer for Plugin FastAPI root middleware responses.
HITLDetail:
properties:
ti_id:
type: string
title: Ti Id
task_instance:
$ref: '#/components/schemas/TaskInstanceResponse'
options:
items:
type: string
Expand Down Expand Up @@ -9950,7 +9949,7 @@ components:
default: false
type: object
required:
- ti_id
- task_instance
- options
- subject
title: HITLDetail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import structlog
from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import joinedload

from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
Expand Down Expand Up @@ -132,7 +133,11 @@ def _get_hitl_detail(
)

ti_id_str = str(task_instance.id)
hitl_detail_model = session.scalar(select(HITLDetailModel).where(HITLDetailModel.ti_id == ti_id_str))
hitl_detail_model = session.scalar(
select(HITLDetailModel)
.where(HITLDetailModel.ti_id == ti_id_str)
.options(joinedload(HITLDetailModel.task_instance))
)
if not hitl_detail_model:
log.error("Human-in-the-loop detail not found")
raise HTTPException(
Expand Down
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/models/hitl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from sqlalchemy import Boolean, Column, ForeignKeyConstraint, String, Text
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship

from airflow.models.base import Base
from airflow.settings import json
Expand Down Expand Up @@ -53,6 +54,11 @@ class HITLDetail(Base):
default=None,
)
params_input = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
task_instance = relationship(
"TaskInstance",
lazy="joined",
back_populates="hitl_detail",
)

__table_args__ = (
ForeignKeyConstraint(
Expand Down
2 changes: 2 additions & 0 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ class TaskInstance(Base, LoggingMixin):
triggerer_job = association_proxy("trigger", "triggerer_job")
dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)
rendered_task_instance_fields = relationship("RenderedTaskInstanceFields", lazy="noload", uselist=False)
hitl_detail = relationship("HITLDetail", lazy="noload", uselist=False)

run_after = association_proxy("dag_run", "run_after")
logical_date = association_proxy("dag_run", "logical_date")
task_instance_note = relationship(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3410,9 +3410,8 @@ export const $FastAPIRootMiddlewareResponse = {

export const $HITLDetail = {
properties: {
ti_id: {
type: 'string',
title: 'Ti Id'
task_instance: {
'$ref': '#/components/schemas/TaskInstanceResponse'
},
options: {
items: {
Expand Down Expand Up @@ -3509,7 +3508,7 @@ export const $HITLDetail = {
}
},
type: 'object',
required: ['ti_id', 'options', 'subject'],
required: ['task_instance', 'options', 'subject'],
title: 'HITLDetail',
description: 'Schema for Human-in-the-loop detail.'
} as const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ export type FastAPIRootMiddlewareResponse = {
* Schema for Human-in-the-loop detail.
*/
export type HITLDetail = {
ti_id: string;
task_instance: TaskInstanceResponse;
options: Array<(string)>;
subject: string;
body?: string | null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations

from unittest import mock

import pytest
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -111,8 +113,51 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]:
"chosen_options": None,
"response_received": False,
"subject": "This is subject",
"ti_id": sample_ti.id,
"user_id": None,
"task_instance": {
"dag_display_name": "dag",
"dag_id": "dag",
"dag_run_id": "test",
"dag_version": {
"bundle_name": "dag_maker",
"bundle_url": None,
"bundle_version": None,
"created_at": mock.ANY,
"dag_display_name": "dag",
"dag_id": "dag",
"id": mock.ANY,
"version_number": 1,
},
"duration": None,
"end_date": None,
"executor": None,
"executor_config": "{}",
"hostname": "",
"id": sample_ti.id,
"logical_date": mock.ANY,
"map_index": -1,
"max_tries": 0,
"note": None,
"operator": "EmptyOperator",
"pid": None,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 1,
"queue": "default",
"queued_when": None,
"rendered_fields": {},
"rendered_map_index": None,
"run_after": mock.ANY,
"scheduled_when": None,
"start_date": None,
"state": None,
"task_display_name": "op1",
"task_id": "op1",
"trigger": None,
"triggerer_job": None,
"try_number": 0,
"unixname": "root",
},
}


Expand Down
56 changes: 28 additions & 28 deletions airflow-ctl/src/airflowctl/api/datamodels/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,34 +573,6 @@ class FastAPIRootMiddlewareResponse(BaseModel):
name: Annotated[str, Field(title="Name")]


class HITLDetail(BaseModel):
"""
Schema for Human-in-the-loop detail.
"""

ti_id: Annotated[str, Field(title="Ti Id")]
options: Annotated[list[str], Field(title="Options")]
subject: Annotated[str, Field(title="Subject")]
body: Annotated[str | None, Field(title="Body")] = None
defaults: Annotated[list[str] | None, Field(title="Defaults")] = None
multiple: Annotated[bool | None, Field(title="Multiple")] = False
params: Annotated[dict[str, Any] | None, Field(title="Params")] = None
user_id: Annotated[str | None, Field(title="User Id")] = None
response_at: Annotated[datetime | None, Field(title="Response At")] = None
chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None
params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None
response_received: Annotated[bool | None, Field(title="Response Received")] = False


class HITLDetailCollection(BaseModel):
"""
Schema for a collection of Human-in-the-loop details.
"""

hitl_details: Annotated[list[HITLDetail], Field(title="Hitl Details")]
total_entries: Annotated[int, Field(title="Total Entries")]


class HITLDetailResponse(BaseModel):
"""
Response of updating a Human-in-the-loop detail.
Expand Down Expand Up @@ -1829,6 +1801,34 @@ class DagStatsCollectionResponse(BaseModel):
total_entries: Annotated[int, Field(title="Total Entries")]


class HITLDetail(BaseModel):
"""
Schema for Human-in-the-loop detail.
"""

task_instance: TaskInstanceResponse
options: Annotated[list[str], Field(title="Options")]
subject: Annotated[str, Field(title="Subject")]
body: Annotated[str | None, Field(title="Body")] = None
defaults: Annotated[list[str] | None, Field(title="Defaults")] = None
multiple: Annotated[bool | None, Field(title="Multiple")] = False
params: Annotated[dict[str, Any] | None, Field(title="Params")] = None
user_id: Annotated[str | None, Field(title="User Id")] = None
response_at: Annotated[datetime | None, Field(title="Response At")] = None
chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None
params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None
response_received: Annotated[bool | None, Field(title="Response Received")] = False


class HITLDetailCollection(BaseModel):
"""
Schema for a collection of Human-in-the-loop details.
"""

hitl_details: Annotated[list[HITLDetail], Field(title="Hitl Details")]
total_entries: Annotated[int, Field(title="Total Entries")]


class PluginCollectionResponse(BaseModel):
"""
Plugin Collection serializer.
Expand Down
1 change: 1 addition & 0 deletions scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def compare_attributes(path1, path2):
"triggerer_job",
"note",
"rendered_task_instance_fields",
"hitl_detail",
# Storing last heartbeat for historic TIs is not interesting/useful
"last_heartbeat_at",
"id",
Expand Down