diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index 8bfb32a3713df..e6031ef83b247 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -625f362919679fe85b2bfb3f9e053261248ac6ec7a974eee51012e55a8105b94 \ No newline at end of file +b31700355c6c8e073896b9daba68b09c252bf1a05f535ef40aa0cb927d12e9ce \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index e6776c5f1538d..c9e62976b5e90 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -776,7 +776,6 @@ dag_version_id [UUID] - NOT NULL duration @@ -1894,7 +1893,7 @@ dag_version--task_instance 0..N -1 +{0,1} diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index a27474fdead69..cf9363c316f39 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -47,9 +47,7 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``09fa89ba1710`` | ``40f7c30a228b`` | ``3.1.0`` | Add trigger_id to deadline. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``40f7c30a228b`` | ``5d3072c51bac`` | ``3.1.0`` | Add Human In the Loop Detail table. | -+-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``5d3072c51bac`` | ``ffdb0566c7c0`` | ``3.1.0`` | Make dag_version_id non-nullable in TaskInstance. | +| ``40f7c30a228b`` | ``ffdb0566c7c0`` | ``3.1.0`` | Add Human In the Loop Detail table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``ffdb0566c7c0`` | ``66a7743fe20e`` | ``3.1.0`` | Add dag_favorite table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py index 3611df275e752..fb87fb66c0aad 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -44,7 +44,6 @@ class TaskInstanceResponse(BaseModel): id: str task_id: str dag_id: str - dag_version: DagVersionResponse run_id: str = Field(alias="dag_run_id") map_index: int logical_date: datetime | None @@ -77,6 +76,7 @@ class TaskInstanceResponse(BaseModel): ) trigger: TriggerResponse | None queued_by_job: JobResponse | None = Field(alias="triggerer_job") + dag_version: DagVersionResponse | None class TaskInstanceCollectionResponse(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 219da4bd06fa7..f42e1846bdfd3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -11202,8 +11202,6 @@ components: dag_id: type: string title: Dag Id - dag_version: - $ref: '#/components/schemas/DagVersionResponse' dag_run_id: type: string title: Dag Run Id @@ -11331,12 +11329,15 @@ components: anyOf: - $ref: '#/components/schemas/JobResponse' - type: 'null' + dag_version: + anyOf: + - $ref: '#/components/schemas/DagVersionResponse' + - type: 'null' type: object required: - id - task_id - dag_id - - dag_version - dag_run_id - map_index - logical_date @@ -11365,6 +11366,7 @@ components: - rendered_map_index - trigger - triggerer_job + - dag_version title: TaskInstanceResponse description: TaskInstance serializer for responses. TaskInstanceState: diff --git a/airflow-core/src/airflow/migrations/versions/0077_3_1_0_add_human_in_the_loop_response.py b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py similarity index 98% rename from airflow-core/src/airflow/migrations/versions/0077_3_1_0_add_human_in_the_loop_response.py rename to airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py index 61f950f5d120e..4b1c5a36e8994 100644 --- a/airflow-core/src/airflow/migrations/versions/0077_3_1_0_add_human_in_the_loop_response.py +++ b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py @@ -20,7 +20,7 @@ Add Human In the Loop Detail table. Revision ID: 40f7c30a228b -Revises: 5d3072c51bac +Revises: ffdb0566c7c0 Create Date: 2025-07-04 15:05:19.459197 """ @@ -37,7 +37,7 @@ # revision identifiers, used by Alembic. revision = "40f7c30a228b" -down_revision = "5d3072c51bac" +down_revision = "ffdb0566c7c0" branch_labels = None depends_on = None airflow_version = "3.1.0" diff --git a/airflow-core/src/airflow/migrations/versions/0076_3_1_0_make_dag_version_id_non_nullable_in_.py b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_make_dag_version_id_non_nullable_in_.py deleted file mode 100644 index cbd183f7b8f43..0000000000000 --- a/airflow-core/src/airflow/migrations/versions/0076_3_1_0_make_dag_version_id_non_nullable_in_.py +++ /dev/null @@ -1,81 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -Make dag_version_id non-nullable in TaskInstance. - -Revision ID: 5d3072c51bac -Revises: ffdb0566c7c0 -Create Date: 2025-05-20 10:38:25.635779 - -""" - -from __future__ import annotations - -import sqlalchemy as sa -from alembic import op -from sqlalchemy_utils import UUIDType - -# revision identifiers, used by Alembic. -revision = "5d3072c51bac" -down_revision = "ffdb0566c7c0" -branch_labels = None -depends_on = None -airflow_version = "3.1.0" - - -def upgrade(): - """Apply make dag_version_id non-nullable in TaskInstance.""" - conn = op.get_bind() - if conn.dialect.name == "postgresql": - update_query = sa.text(""" - UPDATE task_instance - SET dag_version_id = latest_versions.id - FROM ( - SELECT DISTINCT ON (dag_id) dag_id, id - FROM dag_version - ORDER BY dag_id, created_at DESC - ) latest_versions - WHERE task_instance.dag_id = latest_versions.dag_id - AND task_instance.dag_version_id IS NULL - """) - else: - update_query = sa.text(""" - UPDATE task_instance - SET dag_version_id = ( - SELECT id FROM ( - SELECT id, dag_id, - ROW_NUMBER() OVER (PARTITION BY dag_id ORDER BY created_at DESC) as rn - FROM dag_version - ) ranked_versions - WHERE ranked_versions.dag_id = task_instance.dag_id - AND ranked_versions.rn = 1 - ) - WHERE task_instance.dag_version_id IS NULL - """) - - op.execute(update_query) - - with op.batch_alter_table("task_instance", schema=None) as batch_op: - batch_op.alter_column("dag_version_id", existing_type=UUIDType(binary=False), nullable=False) - - -def downgrade(): - """Unapply make dag_version_id non-nullable in TaskInstance.""" - with op.batch_alter_table("task_instance", schema=None) as batch_op: - batch_op.alter_column("dag_version_id", existing_type=UUIDType(binary=False), nullable=True) diff --git a/airflow-core/src/airflow/migrations/versions/0078_3_1_0_add_trigger_id_to_deadline.py b/airflow-core/src/airflow/migrations/versions/0077_3_1_0_add_trigger_id_to_deadline.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0078_3_1_0_add_trigger_id_to_deadline.py rename to airflow-core/src/airflow/migrations/versions/0077_3_1_0_add_trigger_id_to_deadline.py diff --git a/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_callback_state_to_deadline.py b/airflow-core/src/airflow/migrations/versions/0078_3_1_0_add_callback_state_to_deadline.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_callback_state_to_deadline.py rename to airflow-core/src/airflow/migrations/versions/0078_3_1_0_add_callback_state_to_deadline.py diff --git a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_add_url_and_template_params_to_dagbundle_model.py b/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0080_3_1_0_add_url_and_template_params_to_dagbundle_model.py rename to airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py diff --git a/airflow-core/src/airflow/migrations/versions/0081_3_1_0_modify_deadline_callback_schema.py b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0081_3_1_0_modify_deadline_callback_schema.py rename to airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 6dcb7f9b08dc2..3f4dc54455025 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -527,7 +527,8 @@ class TaskInstance(Base, LoggingMixin): _task_display_property_value = Column("task_display_name", String(2000), nullable=True) dag_version_id = Column( - UUIDType(binary=False), ForeignKey("dag_version.id", ondelete="RESTRICT"), nullable=False + UUIDType(binary=False), + ForeignKey("dag_version.id", ondelete="RESTRICT"), ) dag_version = relationship("DagVersion", back_populates="task_instances") diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 58013b0e48d10..abfcc81b07990 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4752,9 +4752,6 @@ export const $TaskInstanceResponse = { type: 'string', title: 'Dag Id' }, - dag_version: { - '$ref': '#/components/schemas/DagVersionResponse' - }, dag_run_id: { type: 'string', title: 'Dag Run Id' @@ -5000,10 +4997,20 @@ export const $TaskInstanceResponse = { type: 'null' } ] + }, + dag_version: { + anyOf: [ + { + '$ref': '#/components/schemas/DagVersionResponse' + }, + { + type: 'null' + } + ] } }, type: 'object', - required: ['id', 'task_id', 'dag_id', 'dag_version', 'dag_run_id', 'map_index', 'logical_date', 'run_after', 'start_date', 'end_date', 'duration', 'state', 'try_number', 'max_tries', 'task_display_name', 'dag_display_name', 'hostname', 'unixname', 'pool', 'pool_slots', 'queue', 'priority_weight', 'operator', 'queued_when', 'scheduled_when', 'pid', 'executor', 'executor_config', 'note', 'rendered_map_index', 'trigger', 'triggerer_job'], + required: ['id', 'task_id', 'dag_id', 'dag_run_id', 'map_index', 'logical_date', 'run_after', 'start_date', 'end_date', 'duration', 'state', 'try_number', 'max_tries', 'task_display_name', 'dag_display_name', 'hostname', 'unixname', 'pool', 'pool_slots', 'queue', 'priority_weight', 'operator', 'queued_when', 'scheduled_when', 'pid', 'executor', 'executor_config', 'note', 'rendered_map_index', 'trigger', 'triggerer_job', 'dag_version'], title: 'TaskInstanceResponse', description: 'TaskInstance serializer for responses.' } as const; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 31107d46e5e00..b0f1a6e5d27ac 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1299,7 +1299,6 @@ export type TaskInstanceResponse = { id: string; task_id: string; dag_id: string; - dag_version: DagVersionResponse; dag_run_id: string; map_index: number; logical_date: string | null; @@ -1331,6 +1330,7 @@ export type TaskInstanceResponse = { }; trigger: TriggerResponse | null; triggerer_job: JobResponse | null; + dag_version: DagVersionResponse | null; }; /** diff --git a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx index 1f4e1886ea7e9..62c0426790cb6 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx @@ -98,7 +98,7 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => { // Check if bundle versions are different const currentDagBundleVersion = dagDetails?.bundle_version; - const taskInstanceDagVersionBundleVersion = taskInstance.dag_version.bundle_version; + const taskInstanceDagVersionBundleVersion = taskInstance.dag_version?.bundle_version; const bundleVersionsDiffer = currentDagBundleVersion !== taskInstanceDagVersionBundleVersion; const shouldShowBundleVersionOption = bundleVersionsDiffer && diff --git a/airflow-core/src/airflow/ui/src/hooks/useSelectedVersion.ts b/airflow-core/src/airflow/ui/src/hooks/useSelectedVersion.ts index 2ed274ef4b54a..fb33fca1719a0 100644 --- a/airflow-core/src/airflow/ui/src/hooks/useSelectedVersion.ts +++ b/airflow-core/src/airflow/ui/src/hooks/useSelectedVersion.ts @@ -82,7 +82,7 @@ const useSelectedVersion = (): number | undefined => { const selectedVersionNumber = selectedVersionUrl ?? - (mappedTaskInstanceData ? mappedTaskInstanceData.dag_version.version_number : undefined) ?? + mappedTaskInstanceData?.dag_version?.version_number ?? (runData?.dag_versions ?? []).at(-1)?.version_number ?? dagData?.latest_dag_version?.version_number; diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index cdc132c6e58d4..ac72e6da31f30 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1618,7 +1618,6 @@ class TaskInstanceResponse(BaseModel): id: Annotated[str, Field(title="Id")] task_id: Annotated[str, Field(title="Task Id")] dag_id: Annotated[str, Field(title="Dag Id")] - dag_version: DagVersionResponse dag_run_id: Annotated[str, Field(title="Dag Run Id")] map_index: Annotated[int, Field(title="Map Index")] logical_date: Annotated[datetime | None, Field(title="Logical Date")] = None @@ -1648,6 +1647,7 @@ class TaskInstanceResponse(BaseModel): rendered_fields: Annotated[dict[str, Any] | None, Field(title="Rendered Fields")] = None trigger: TriggerResponse | None = None triggerer_job: JobResponse | None = None + dag_version: DagVersionResponse | None = None class TaskResponse(BaseModel):