Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from datetime import datetime
from typing import Generic, Literal, TypeVar

from pydantic import computed_field
from pydantic import AliasPath, Field, computed_field

from airflow._shared.timezones import timezone
from airflow.api_fastapi.core_api.base import BaseModel
Expand Down Expand Up @@ -79,6 +79,7 @@ class GridRunsResponse(BaseModel):
run_after: datetime
state: TaskInstanceState | None
run_type: DagRunType
dag_display_name: str = Field(validation_alias=AliasPath("dag_model", "dag_display_name"))

@computed_field
def duration(self) -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1778,6 +1778,9 @@ components:
- type: 'null'
run_type:
$ref: '#/components/schemas/DagRunType'
dag_display_name:
type: string
title: Dag Display Name
duration:
type: integer
title: Duration
Expand All @@ -1792,6 +1795,7 @@ components:
- run_after
- state
- run_type
- dag_display_name
- duration
title: GridRunsResponse
description: Base Node serializer for responses.
Expand Down
14 changes: 3 additions & 11 deletions airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import structlog
from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import joinedload

from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
Expand Down Expand Up @@ -217,16 +218,7 @@ def get_grid_runs(
) -> list[GridRunsResponse]:
"""Get info about a run for the grid."""
# Retrieve, sort the previous DAG Runs
base_query = select(
DagRun.dag_id,
DagRun.run_id,
DagRun.queued_at,
DagRun.start_date,
DagRun.end_date,
DagRun.run_after,
DagRun.state,
DagRun.run_type,
).where(DagRun.dag_id == dag_id)
base_query = select(DagRun).options(joinedload(DagRun.dag_model)).where(DagRun.dag_id == dag_id)

Comment on lines 220 to 222
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if selecting all fields of DagRun model will introduce long loading time for Gride View or not?

As Grid view optimization #51805 only select necessary fields for this route.
Maybe there isn't large difference, but it would be nice to have a small benchmark to find out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my test on my local machine, it does increase the load time 0.1s in average. Not sure if this is ok in here.

before (avg 3.95s in 10 time tests)

Screenshot 2025-08-22 at 9 13 02 PM

after (avg 4.05 in 10 time tests)

Screenshot 2025-08-22 at 9 07 46 PM

Copy link
Member

@jason810496 jason810496 Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update!
How about taking Dag Example from Grid view optimization #51805 as baseline?

Or is this the same Dag as #51805? The dag name seems similiar.

Copy link
Member Author

@guan404ming guan404ming Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My computer is too old to run the original PR example smoothly 🥲, so I simplified it before testing. The simplified dag I used is based on the same idea, but not exactly identical to the one in #51805. The only difference is that the number of iterations is reduced to 5.

from __future__ import annotations

import datetime

import pendulum

from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, TaskGroup, chain

with DAG(
    dag_id="big_hello",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
    params={"example_key": "example_value"},
) as dag:
    for i in range(5):
        with TaskGroup(f"group_{i}"):
            EmptyOperator(task_id="hello")
            with TaskGroup(f"group_{i}2"):
                chain([EmptyOperator(task_id=f"empty_{j}") for j in range(5)])
                chain([EmptyOperator(task_id=f"empty2_{j}") for j in range(5)])
                # EmptyOperator.partial(task_id=f"hello2").expand(doc=list(range(5)))
            with TaskGroup(f"group_{i}3"):
                chain([EmptyOperator(task_id=f"empty_{j}") for j in range(5)])
                chain([EmptyOperator(task_id=f"empty2_{j}") for j in range(5)])
                # EmptyOperator.partial(task_id=f"hello2").expand(doc=list(range(5)))
            with TaskGroup(f"group_{i}4"):
                chain([EmptyOperator(task_id=f"empty_{j}") for j in range(5)])
                chain([EmptyOperator(task_id=f"empty2_{j}") for j in range(5)])
                # EmptyOperator.partial(task_id=f"hello2").expand(doc=list(range(5)))

# This comparison is to fall back to DAG timetable when no order_by is provided
if order_by.value == [order_by.get_primary_key_string()]:
Expand All @@ -244,7 +236,7 @@ def get_grid_runs(
filters=[run_after],
limit=limit,
)
return session.execute(dag_runs_select_filter)
return session.scalars(dag_runs_select_filter)


@grid_router.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7102,14 +7102,18 @@ export const $GridRunsResponse = {
run_type: {
'$ref': '#/components/schemas/DagRunType'
},
dag_display_name: {
type: 'string',
title: 'Dag Display Name'
},
duration: {
type: 'integer',
title: 'Duration',
readOnly: true
}
},
type: 'object',
required: ['dag_id', 'run_id', 'queued_at', 'start_date', 'end_date', 'run_after', 'state', 'run_type', 'duration'],
required: ['dag_id', 'run_id', 'queued_at', 'start_date', 'end_date', 'run_after', 'state', 'run_type', 'dag_display_name', 'duration'],
title: 'GridRunsResponse',
description: 'Base Node serializer for responses.'
} as const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,7 @@ export type GridRunsResponse = {
run_after: string;
state: TaskInstanceState | null;
run_type: DagRunType;
dag_display_name: string;
readonly duration: number;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
DAG_ID_2 = "test_dag_2"
DAG_ID_3 = "test_dag_3"
DAG_ID_4 = "test_dag_4"
DAG_DISPLAY_NAME = "test_dag_display_name"
DAG_DISPLAY_NAME_2 = "test_dag_display_name_2"
DAG_DISPLAY_NAME_3 = "test_dag_display_name_3"
DAG_DISPLAY_NAME_4 = "test_dag_display_name_4"
TASK_ID = "task"
TASK_ID_2 = "task2"
TASK_ID_3 = "task3"
Expand All @@ -57,6 +61,7 @@

GRID_RUN_1 = {
"dag_id": "test_dag",
"dag_display_name": "test_dag_display_name",
"duration": 0,
"end_date": "2024-12-31T00:00:00Z",
"run_after": "2024-11-30T00:00:00Z",
Expand All @@ -68,6 +73,7 @@

GRID_RUN_2 = {
"dag_id": "test_dag",
"dag_display_name": "test_dag_display_name",
"duration": 0,
"end_date": "2024-12-31T00:00:00Z",
"run_after": "2024-11-30T00:00:00Z",
Expand Down Expand Up @@ -120,7 +126,7 @@ def setup(dag_maker, session=None):
clear_db_serialized_dags()

# DAG 1
with dag_maker(dag_id=DAG_ID, serialized=True, session=session) as dag:
with dag_maker(dag_id=DAG_ID, serialized=True, session=session, dag_display_name=DAG_DISPLAY_NAME) as dag:
task = EmptyOperator(task_id=TASK_ID)

@task_group
Expand Down Expand Up @@ -172,11 +178,13 @@ def mapped_task_group(arg1):
ti.end_date = None

# DAG 2
with dag_maker(dag_id=DAG_ID_2, serialized=True, session=session):
with dag_maker(dag_id=DAG_ID_2, serialized=True, session=session, dag_display_name=DAG_DISPLAY_NAME_2):
EmptyOperator(task_id=TASK_ID_2)

# DAG 3 for testing removed task
with dag_maker(dag_id=DAG_ID_3, serialized=True, session=session) as dag_3:
with dag_maker(
dag_id=DAG_ID_3, serialized=True, session=session, dag_display_name=DAG_DISPLAY_NAME_3
) as dag_3:
EmptyOperator(task_id=TASK_ID_3)
EmptyOperator(task_id=TASK_ID_4)
with TaskGroup(group_id=TASK_GROUP_ID):
Expand All @@ -195,7 +203,7 @@ def mapped_task_group(arg1):
)

# Serialize DAG with only one task
with dag_maker(dag_id=DAG_ID_3, serialized=True, session=session):
with dag_maker(dag_id=DAG_ID_3, serialized=True, session=session, dag_display_name=DAG_DISPLAY_NAME_3):
EmptyOperator(task_id=TASK_ID_3)

run_4 = dag_maker.create_dagrun(
Expand All @@ -216,7 +224,9 @@ def mapped_task_group(arg1):
ti.end_date = None

# DAG 4 for testing removed task
with dag_maker(dag_id=DAG_ID_4, serialized=True, session=session) as dag_4:
with dag_maker(
dag_id=DAG_ID_4, serialized=True, session=session, dag_display_name=DAG_DISPLAY_NAME_4
) as dag_4:
t1 = EmptyOperator(task_id="t1")
t2 = EmptyOperator(task_id="t2")
with TaskGroup(group_id=f"{TASK_GROUP_ID}-1") as tg1:
Expand Down Expand Up @@ -248,6 +258,7 @@ def mapped_task_group(arg1):
ti.end_date = end_date
start_date = end_date
end_date = start_date.add(seconds=2)

session.commit()


Expand Down Expand Up @@ -474,6 +485,7 @@ def test_get_grid_runs(self, session, test_client):
assert response.json() == [
{
"dag_id": "test_dag",
"dag_display_name": "test_dag_display_name",
"duration": 0,
"end_date": "2024-12-31T00:00:00Z",
"run_after": "2024-11-30T00:00:00Z",
Expand All @@ -484,6 +496,7 @@ def test_get_grid_runs(self, session, test_client):
},
{
"dag_id": "test_dag",
"dag_display_name": "test_dag_display_name",
"duration": 0,
"end_date": "2024-12-31T00:00:00Z",
"run_after": "2024-11-30T00:00:00Z",
Expand Down
Loading