From faa660a0635708510cef698a02d2346ce1b13b27 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Thu, 5 Dec 2024 18:03:02 +0100 Subject: [PATCH 1/5] Working state --- .../core_api/datamodels/ui/structure.py | 2 +- .../core_api/openapi/v1-generated.yaml | 14 ++- .../core_api/routes/ui/structure.py | 27 +++++ airflow/ui/openapi-gen/queries/common.ts | 6 +- airflow/ui/openapi-gen/queries/prefetch.ts | 5 + airflow/ui/openapi-gen/queries/queries.ts | 6 +- airflow/ui/openapi-gen/queries/suspense.ts | 6 +- .../ui/openapi-gen/requests/schemas.gen.ts | 11 +- .../ui/openapi-gen/requests/services.gen.ts | 2 + airflow/ui/openapi-gen/requests/types.gen.ts | 21 +++- .../layouts/Details/Graph/useGraphLayout.ts | 2 +- .../core_api/routes/ui/test_structure.py | 113 +++++++++++++++++- 12 files changed, 205 insertions(+), 10 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/ui/structure.py b/airflow/api_fastapi/core_api/datamodels/ui/structure.py index cb135c141bb36..40c7b9dc0774f 100644 --- a/airflow/api_fastapi/core_api/datamodels/ui/structure.py +++ b/airflow/api_fastapi/core_api/datamodels/ui/structure.py @@ -39,7 +39,7 @@ class NodeResponse(BaseModel): label: str tooltip: str | None = None setup_teardown_type: Literal["setup", "teardown"] | None = None - type: Literal["join", "task", "asset_condition"] + type: Literal["join", "task", "asset-condition", "asset", "asset-alias", "dag", "sensor", "trigger"] operator: str | None = None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 0bfe7f6fe631b..3139f12b6b4ad 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -240,6 +240,13 @@ paths: type: boolean default: false title: Include Downstream + - name: external_dependencies + in: query + required: false + schema: + type: boolean + default: false + title: External Dependencies responses: '200': description: Successful Response @@ -8043,7 +8050,12 @@ components: enum: - join - task - - asset_condition + - asset-condition + - asset + - asset-alias + - dag + - sensor + - trigger title: Type operator: anyOf: diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow/api_fastapi/core_api/routes/ui/structure.py index de142776ba972..f8de61647a6c2 100644 --- a/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -22,6 +22,7 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.models.serialized_dag import SerializedDagModel from airflow.utils.dag_edges import dag_edges from airflow.utils.task_group import task_group_to_dict @@ -40,6 +41,7 @@ def structure_data( root: str | None = None, include_upstream: bool = False, include_downstream: bool = False, + external_dependencies: bool = False, ) -> StructureDataResponse: """Get Structure Data.""" dag = request.app.state.dag_bag.get_dag(dag_id) @@ -63,4 +65,29 @@ def structure_data( "edges": edges, } + if external_dependencies: + dependencies = SerializedDagModel.get_dag_dependencies()[dag_id] + + entry_node_ref = nodes[0] + exit_node_ref = nodes[-1] + + for dependency in dependencies: + # Add nodes + nodes.append( + { + "id": dependency.node_id, + "label": dependency.node_id, + "type": dependency.dependency_type, + } + ) + + # Add edges + # start dependency + if dependency.target != dependency.dependency_type: + edges.insert(0, {"source_id": dependency.node_id, "target_id": entry_node_ref["id"]}) + + # end dependency + elif dependency.source != dependency.dependency_type: + edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id}) + return StructureDataResponse(**data) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index be897ab52a52c..d747a10893b50 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -387,11 +387,13 @@ export const useStructureServiceStructureDataKey = export const UseStructureServiceStructureDataKeyFn = ( { dagId, + externalDependencies, includeDownstream, includeUpstream, root, }: { dagId: string; + externalDependencies?: boolean; includeDownstream?: boolean; includeUpstream?: boolean; root?: string; @@ -399,7 +401,9 @@ export const UseStructureServiceStructureDataKeyFn = ( queryKey?: Array, ) => [ useStructureServiceStructureDataKey, - ...(queryKey ?? [{ dagId, includeDownstream, includeUpstream, root }]), + ...(queryKey ?? [ + { dagId, externalDependencies, includeDownstream, includeUpstream, root }, + ]), ]; export type BackfillServiceListBackfillsDefaultResponse = Awaited< ReturnType diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 8b80ae7c08df6..8f455a2409c8c 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -489,6 +489,7 @@ export const prefetchUseDashboardServiceHistoricalMetrics = ( * @param data.root * @param data.includeUpstream * @param data.includeDownstream + * @param data.externalDependencies * @returns StructureDataResponse Successful Response * @throws ApiError */ @@ -496,11 +497,13 @@ export const prefetchUseStructureServiceStructureData = ( queryClient: QueryClient, { dagId, + externalDependencies, includeDownstream, includeUpstream, root, }: { dagId: string; + externalDependencies?: boolean; includeDownstream?: boolean; includeUpstream?: boolean; root?: string; @@ -509,6 +512,7 @@ export const prefetchUseStructureServiceStructureData = ( queryClient.prefetchQuery({ queryKey: Common.UseStructureServiceStructureDataKeyFn({ dagId, + externalDependencies, includeDownstream, includeUpstream, root, @@ -516,6 +520,7 @@ export const prefetchUseStructureServiceStructureData = ( queryFn: () => StructureService.structureData({ dagId, + externalDependencies, includeDownstream, includeUpstream, root, diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 83404e4bb3494..60bcb9de9c390 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -613,6 +613,7 @@ export const useDashboardServiceHistoricalMetrics = < * @param data.root * @param data.includeUpstream * @param data.includeDownstream + * @param data.externalDependencies * @returns StructureDataResponse Successful Response * @throws ApiError */ @@ -623,11 +624,13 @@ export const useStructureServiceStructureData = < >( { dagId, + externalDependencies, includeDownstream, includeUpstream, root, }: { dagId: string; + externalDependencies?: boolean; includeDownstream?: boolean; includeUpstream?: boolean; root?: string; @@ -637,12 +640,13 @@ export const useStructureServiceStructureData = < ) => useQuery({ queryKey: Common.UseStructureServiceStructureDataKeyFn( - { dagId, includeDownstream, includeUpstream, root }, + { dagId, externalDependencies, includeDownstream, includeUpstream, root }, queryKey, ), queryFn: () => StructureService.structureData({ dagId, + externalDependencies, includeDownstream, includeUpstream, root, diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 4508a23b5e7b3..f29fb594e2990 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -588,6 +588,7 @@ export const useDashboardServiceHistoricalMetricsSuspense = < * @param data.root * @param data.includeUpstream * @param data.includeDownstream + * @param data.externalDependencies * @returns StructureDataResponse Successful Response * @throws ApiError */ @@ -598,11 +599,13 @@ export const useStructureServiceStructureDataSuspense = < >( { dagId, + externalDependencies, includeDownstream, includeUpstream, root, }: { dagId: string; + externalDependencies?: boolean; includeDownstream?: boolean; includeUpstream?: boolean; root?: string; @@ -612,12 +615,13 @@ export const useStructureServiceStructureDataSuspense = < ) => useSuspenseQuery({ queryKey: Common.UseStructureServiceStructureDataKeyFn( - { dagId, includeDownstream, includeUpstream, root }, + { dagId, externalDependencies, includeDownstream, includeUpstream, root }, queryKey, ), queryFn: () => StructureService.structureData({ dagId, + externalDependencies, includeDownstream, includeUpstream, root, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 965355b0c1937..79991c087fb24 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3186,7 +3186,16 @@ export const $NodeResponse = { }, type: { type: "string", - enum: ["join", "task", "asset_condition"], + enum: [ + "join", + "task", + "asset-condition", + "asset", + "asset-alias", + "dag", + "sensor", + "trigger", + ], title: "Type", }, operator: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 98ff5677360f5..be4dd239cbe3e 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -742,6 +742,7 @@ export class StructureService { * @param data.root * @param data.includeUpstream * @param data.includeDownstream + * @param data.externalDependencies * @returns StructureDataResponse Successful Response * @throws ApiError */ @@ -756,6 +757,7 @@ export class StructureService { root: data.root, include_upstream: data.includeUpstream, include_downstream: data.includeDownstream, + external_dependencies: data.externalDependencies, }, errors: { 404: "Not Found", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 42196f1a89202..937aacc1239f4 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -773,11 +773,27 @@ export type NodeResponse = { label: string; tooltip?: string | null; setup_teardown_type?: "setup" | "teardown" | null; - type: "join" | "task" | "asset_condition"; + type: + | "join" + | "task" + | "asset-condition" + | "asset" + | "asset-alias" + | "dag" + | "sensor" + | "trigger"; operator?: string | null; }; -export type type = "join" | "task" | "asset_condition"; +export type type = + | "join" + | "task" + | "asset-condition" + | "asset" + | "asset-alias" + | "dag" + | "sensor" + | "trigger"; /** * Request body for Clear Task Instances endpoint. @@ -1443,6 +1459,7 @@ export type HistoricalMetricsResponse = HistoricalMetricDataResponse; export type StructureDataData = { dagId: string; + externalDependencies?: boolean; includeDownstream?: boolean; includeUpstream?: boolean; root?: string | null; diff --git a/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts b/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts index 2cd9bae37b4a0..819d03f77d449 100644 --- a/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts +++ b/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts @@ -211,7 +211,7 @@ const generateElkGraph = ({ if (node.type === "join") { width = 10; height = 10; - } else if (node.type === "asset_condition") { + } else if (node.type === "asset-condition") { width = 30; height = 30; } diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index e270a49c6e813..52e5b0d107cda 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -22,6 +22,7 @@ from airflow.models import DagBag from airflow.operators.empty import EmptyOperator +from airflow.sdk.definitions.asset import Asset, AssetAlias, Dataset from tests_common.test_utils.db import clear_db_runs @@ -51,8 +52,15 @@ def make_dag(dag_maker, session, time_machine): serialized=True, session=session, start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), + schedule=( + Asset(uri="s3://bucket/next-run-asset/1", name="asset1") + & Asset(uri="s3://bucket/next-run-asset/2", name="asset2") + & AssetAlias("example-alias") + ), ): - EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2") + EmptyOperator( + task_id="task_1", outlets=[Dataset(uri="s3://dataset-bucket/example.csv")] + ) >> EmptyOperator(task_id="task_2") dag_maker.dagbag.sync_to_db() @@ -128,6 +136,109 @@ class TestStructureDataEndpoint: ], }, ), + ( + { + "dag_id": DAG_ID, + "external_dependencies": True, + }, + { + "arrange": "LR", + "edges": [ + { + "is_setup_teardown": None, + "label": None, + "source_id": "asset-alias:example-alias", + "target_id": "task_1", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "asset:asset2", + "target_id": "task_1", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "asset:asset1", + "target_id": "task_1", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_1", + "target_id": "task_2", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_2", + "target_id": "asset:s3://dataset-bucket/example.csv", + }, + ], + "nodes": [ + { + "children": None, + "id": "task_1", + "is_mapped": None, + "label": "task_1", + "operator": "EmptyOperator", + "setup_teardown_type": None, + "tooltip": None, + "type": "task", + }, + { + "children": None, + "id": "task_2", + "is_mapped": None, + "label": "task_2", + "operator": "EmptyOperator", + "setup_teardown_type": None, + "tooltip": None, + "type": "task", + }, + { + "children": None, + "id": "asset:asset1", + "is_mapped": None, + "label": "asset:asset1", + "operator": None, + "setup_teardown_type": None, + "tooltip": None, + "type": "asset", + }, + { + "children": None, + "id": "asset:asset2", + "is_mapped": None, + "label": "asset:asset2", + "operator": None, + "setup_teardown_type": None, + "tooltip": None, + "type": "asset", + }, + { + "children": None, + "id": "asset-alias:example-alias", + "is_mapped": None, + "label": "asset-alias:example-alias", + "operator": None, + "setup_teardown_type": None, + "tooltip": None, + "type": "asset-alias", + }, + { + "children": None, + "id": "asset:s3://dataset-bucket/example.csv", + "is_mapped": None, + "label": "asset:s3://dataset-bucket/example.csv", + "operator": None, + "setup_teardown_type": None, + "tooltip": None, + "type": "asset", + }, + ], + }, + ), ], ) @pytest.mark.usefixtures("make_dag") From 21fbf98cd3f9c89f1c6633fb315deefb65db946b Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Tue, 10 Dec 2024 10:39:50 +0100 Subject: [PATCH 2/5] Add tests for external task sensor --- .../core_api/routes/ui/structure.py | 4 +- .../core_api/routes/ui/test_structure.py | 93 ++++++++++++++----- 2 files changed, 73 insertions(+), 24 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow/api_fastapi/core_api/routes/ui/structure.py index f8de61647a6c2..85c6ed3277a83 100644 --- a/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -54,9 +54,7 @@ def structure_data( task_ids_or_regex=root, include_upstream=include_upstream, include_downstream=include_downstream ) - nodes = [ - task_group_to_dict(child) for child in sorted(dag.task_group.children.values(), key=lambda t: t.label) - ] + nodes = [task_group_to_dict(child) for child in dag.task_group.children.values()] edges = dag_edges(dag) data = { diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index 52e5b0d107cda..5d81f77d59b26 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -22,6 +22,7 @@ from airflow.models import DagBag from airflow.operators.empty import EmptyOperator +from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from airflow.sdk.definitions.asset import Asset, AssetAlias, Dataset from tests_common.test_utils.db import clear_db_runs @@ -58,9 +59,11 @@ def make_dag(dag_maker, session, time_machine): & AssetAlias("example-alias") ), ): - EmptyOperator( - task_id="task_1", outlets=[Dataset(uri="s3://dataset-bucket/example.csv")] - ) >> EmptyOperator(task_id="task_2") + ( + EmptyOperator(task_id="task_1", outlets=[Dataset(uri="s3://dataset-bucket/example.csv")]) + >> ExternalTaskSensor(task_id="external_task_sensor", external_dag_id=DAG_ID) + >> EmptyOperator(task_id="task_2") + ) dag_maker.dagbag.sync_to_db() @@ -72,14 +75,19 @@ class TestStructureDataEndpoint: ( {"dag_id": DAG_ID}, { - "arrange": "LR", "edges": [ { "is_setup_teardown": None, "label": None, - "source_id": "task_1", + "source_id": "external_task_sensor", "target_id": "task_2", }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_1", + "target_id": "external_task_sensor", + }, ], "nodes": [ { @@ -87,22 +95,33 @@ class TestStructureDataEndpoint: "id": "task_1", "is_mapped": None, "label": "task_1", - "operator": "EmptyOperator", + "tooltip": None, "setup_teardown_type": None, + "type": "task", + "operator": "EmptyOperator", + }, + { + "children": None, + "id": "external_task_sensor", + "is_mapped": None, + "label": "external_task_sensor", "tooltip": None, + "setup_teardown_type": None, "type": "task", + "operator": "ExternalTaskSensor", }, { "children": None, "id": "task_2", "is_mapped": None, "label": "task_2", - "operator": "EmptyOperator", - "setup_teardown_type": None, "tooltip": None, + "setup_teardown_type": None, "type": "task", + "operator": "EmptyOperator", }, ], + "arrange": "LR", }, ), ( @@ -142,8 +161,13 @@ class TestStructureDataEndpoint: "external_dependencies": True, }, { - "arrange": "LR", "edges": [ + { + "is_setup_teardown": None, + "label": None, + "source_id": "sensor:test_dag_id:test_dag_id:external_task_sensor", + "target_id": "task_1", + }, { "is_setup_teardown": None, "label": None, @@ -165,9 +189,15 @@ class TestStructureDataEndpoint: { "is_setup_teardown": None, "label": None, - "source_id": "task_1", + "source_id": "external_task_sensor", "target_id": "task_2", }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_1", + "target_id": "external_task_sensor", + }, { "is_setup_teardown": None, "label": None, @@ -181,62 +211,83 @@ class TestStructureDataEndpoint: "id": "task_1", "is_mapped": None, "label": "task_1", - "operator": "EmptyOperator", + "tooltip": None, "setup_teardown_type": None, + "type": "task", + "operator": "EmptyOperator", + }, + { + "children": None, + "id": "external_task_sensor", + "is_mapped": None, + "label": "external_task_sensor", "tooltip": None, + "setup_teardown_type": None, "type": "task", + "operator": "ExternalTaskSensor", }, { "children": None, "id": "task_2", "is_mapped": None, "label": "task_2", - "operator": "EmptyOperator", - "setup_teardown_type": None, "tooltip": None, + "setup_teardown_type": None, "type": "task", + "operator": "EmptyOperator", }, { "children": None, "id": "asset:asset1", "is_mapped": None, "label": "asset:asset1", - "operator": None, - "setup_teardown_type": None, "tooltip": None, + "setup_teardown_type": None, "type": "asset", + "operator": None, }, { "children": None, "id": "asset:asset2", "is_mapped": None, "label": "asset:asset2", - "operator": None, - "setup_teardown_type": None, "tooltip": None, + "setup_teardown_type": None, "type": "asset", + "operator": None, }, { "children": None, "id": "asset-alias:example-alias", "is_mapped": None, "label": "asset-alias:example-alias", - "operator": None, - "setup_teardown_type": None, "tooltip": None, + "setup_teardown_type": None, "type": "asset-alias", + "operator": None, }, { "children": None, "id": "asset:s3://dataset-bucket/example.csv", "is_mapped": None, "label": "asset:s3://dataset-bucket/example.csv", - "operator": None, - "setup_teardown_type": None, "tooltip": None, + "setup_teardown_type": None, "type": "asset", + "operator": None, + }, + { + "children": None, + "id": "sensor:test_dag_id:test_dag_id:external_task_sensor", + "is_mapped": None, + "label": "sensor:test_dag_id:test_dag_id:external_task_sensor", + "tooltip": None, + "setup_teardown_type": None, + "type": "sensor", + "operator": None, }, ], + "arrange": "LR", }, ), ], From 119b80b0f6d39eacbdb50c7677179c6b5ecd874a Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Tue, 10 Dec 2024 14:34:27 +0100 Subject: [PATCH 3/5] Add test for TriggerDagRunOperator --- .../core_api/routes/ui/structure.py | 40 ++++++++++--------- .../core_api/routes/ui/test_structure.py | 27 +++++++++++++ 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow/api_fastapi/core_api/routes/ui/structure.py index 85c6ed3277a83..2b7cfef02bd5b 100644 --- a/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -54,7 +54,7 @@ def structure_data( task_ids_or_regex=root, include_upstream=include_upstream, include_downstream=include_downstream ) - nodes = [task_group_to_dict(child) for child in dag.task_group.children.values()] + nodes = [task_group_to_dict(child) for child in dag.task_group.topological_sort()] edges = dag_edges(dag) data = { @@ -64,28 +64,30 @@ def structure_data( } if external_dependencies: - dependencies = SerializedDagModel.get_dag_dependencies()[dag_id] - entry_node_ref = nodes[0] exit_node_ref = nodes[-1] - for dependency in dependencies: - # Add nodes - nodes.append( - { - "id": dependency.node_id, - "label": dependency.node_id, - "type": dependency.dependency_type, - } - ) + for dependency_dag_id, dependencies in SerializedDagModel.get_dag_dependencies().items(): + for dependency in dependencies: + if dependency_dag_id != dag_id and dependency.target != dag_id: + continue + + # Add nodes + nodes.append( + { + "id": dependency.node_id, + "label": dependency.node_id, + "type": dependency.dependency_type, + } + ) - # Add edges - # start dependency - if dependency.target != dependency.dependency_type: - edges.insert(0, {"source_id": dependency.node_id, "target_id": entry_node_ref["id"]}) + # Add edges + # start dependency + if dependency.target != dependency.dependency_type: + edges.insert(0, {"source_id": dependency.node_id, "target_id": entry_node_ref["id"]}) - # end dependency - elif dependency.source != dependency.dependency_type: - edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id}) + # end dependency + elif dependency.source != dependency.dependency_type: + edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id}) return StructureDataResponse(**data) diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index 5d81f77d59b26..3b33a68433a6a 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -22,6 +22,7 @@ from airflow.models import DagBag from airflow.operators.empty import EmptyOperator +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from airflow.sdk.definitions.asset import Asset, AssetAlias, Dataset @@ -48,6 +49,16 @@ def clean(): @pytest.fixture def make_dag(dag_maker, session, time_machine): + with dag_maker( + dag_id="external_trigger", + serialized=True, + session=session, + start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), + ): + TriggerDagRunOperator(task_id="trigger_dag_run_operator", trigger_dag_id=DAG_ID) + + dag_maker.dagbag.sync_to_db() + with dag_maker( dag_id=DAG_ID, serialized=True, @@ -186,6 +197,12 @@ class TestStructureDataEndpoint: "source_id": "asset:asset1", "target_id": "task_1", }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + "target_id": "task_1", + }, { "is_setup_teardown": None, "label": None, @@ -236,6 +253,16 @@ class TestStructureDataEndpoint: "type": "task", "operator": "EmptyOperator", }, + { + "children": None, + "id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + "is_mapped": None, + "label": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + "tooltip": None, + "setup_teardown_type": None, + "type": "trigger", + "operator": None, + }, { "children": None, "id": "asset:asset1", From 87b5121bfdd68f87fa1cbf5fbcc9f3492e96b361 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Wed, 11 Dec 2024 12:03:44 +0100 Subject: [PATCH 4/5] Fix following code review --- .../api_fastapi/core_api/routes/ui/structure.py | 17 +++++++++++------ .../core_api/routes/ui/test_structure.py | 8 ++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow/api_fastapi/core_api/routes/ui/structure.py index 2b7cfef02bd5b..72fa780da1fe0 100644 --- a/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -64,8 +64,11 @@ def structure_data( } if external_dependencies: - entry_node_ref = nodes[0] - exit_node_ref = nodes[-1] + entry_node_ref = nodes[0] if nodes else None + exit_node_ref = nodes[-1] if nodes else None + + start_edges: list[dict] = [] + end_edges: list[dict] = [] for dependency_dag_id, dependencies in SerializedDagModel.get_dag_dependencies().items(): for dependency in dependencies: @@ -83,11 +86,13 @@ def structure_data( # Add edges # start dependency - if dependency.target != dependency.dependency_type: - edges.insert(0, {"source_id": dependency.node_id, "target_id": entry_node_ref["id"]}) + if dependency.target != dependency.dependency_type and entry_node_ref: + start_edges.append({"source_id": dependency.node_id, "target_id": entry_node_ref["id"]}) # end dependency - elif dependency.source != dependency.dependency_type: - edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id}) + elif dependency.source != dependency.dependency_type and exit_node_ref: + end_edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id}) + + data["edges"] = start_edges + edges + end_edges return StructureDataResponse(**data) diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index 3b33a68433a6a..56c6db55d8e55 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -176,13 +176,13 @@ class TestStructureDataEndpoint: { "is_setup_teardown": None, "label": None, - "source_id": "sensor:test_dag_id:test_dag_id:external_task_sensor", + "source_id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", "target_id": "task_1", }, { "is_setup_teardown": None, "label": None, - "source_id": "asset-alias:example-alias", + "source_id": "asset:asset1", "target_id": "task_1", }, { @@ -194,13 +194,13 @@ class TestStructureDataEndpoint: { "is_setup_teardown": None, "label": None, - "source_id": "asset:asset1", + "source_id": "asset-alias:example-alias", "target_id": "task_1", }, { "is_setup_teardown": None, "label": None, - "source_id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + "source_id": "sensor:test_dag_id:test_dag_id:external_task_sensor", "target_id": "task_1", }, { From 4aaeb1cd442c6af1245b1f1352f559a9575e227b Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Thu, 12 Dec 2024 16:26:00 +0100 Subject: [PATCH 5/5] Update following code review --- .../core_api/routes/ui/structure.py | 11 ++-- airflow/www/views.py | 1 + .../core_api/routes/ui/test_structure.py | 51 ++++++++++++++++--- 3 files changed, 52 insertions(+), 11 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow/api_fastapi/core_api/routes/ui/structure.py index 72fa780da1fe0..c3b914508d42f 100644 --- a/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -31,7 +31,6 @@ @structure_router.get( "/structure_data", - include_in_schema=False, responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def structure_data( @@ -79,18 +78,22 @@ def structure_data( nodes.append( { "id": dependency.node_id, - "label": dependency.node_id, + "label": dependency.dependency_id, "type": dependency.dependency_type, } ) # Add edges # start dependency - if dependency.target != dependency.dependency_type and entry_node_ref: + if ( + dependency.source == dependency.dependency_type or dependency.target == dag_id + ) and entry_node_ref: start_edges.append({"source_id": dependency.node_id, "target_id": entry_node_ref["id"]}) # end dependency - elif dependency.source != dependency.dependency_type and exit_node_ref: + elif ( + dependency.target == dependency.dependency_type or dependency.source == dag_id + ) and exit_node_ref: end_edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id}) data["edges"] = start_edges + edges + end_edges diff --git a/airflow/www/views.py b/airflow/www/views.py index b12092ffe3f1e..fa60f732e725e 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3228,6 +3228,7 @@ def extra_links(self, *, session: Session = NEW_SESSION): else: return {"url": None, "error": f"No URL found for {link_name}"}, 404 + @mark_fastapi_migration_done @expose("/object/graph_data") @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE) @gzipped diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index 56c6db55d8e55..0596b0f43e0fe 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -31,6 +31,7 @@ pytestmark = pytest.mark.db_test DAG_ID = "test_dag_id" +DAG_ID_EXTERNAL_TRIGGER = "external_trigger" @pytest.fixture(autouse=True, scope="module") @@ -50,7 +51,7 @@ def clean(): @pytest.fixture def make_dag(dag_maker, session, time_machine): with dag_maker( - dag_id="external_trigger", + dag_id=DAG_ID_EXTERNAL_TRIGGER, serialized=True, session=session, start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), @@ -257,7 +258,7 @@ class TestStructureDataEndpoint: "children": None, "id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", "is_mapped": None, - "label": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + "label": "trigger_dag_run_operator", "tooltip": None, "setup_teardown_type": None, "type": "trigger", @@ -267,7 +268,7 @@ class TestStructureDataEndpoint: "children": None, "id": "asset:asset1", "is_mapped": None, - "label": "asset:asset1", + "label": "asset1", "tooltip": None, "setup_teardown_type": None, "type": "asset", @@ -277,7 +278,7 @@ class TestStructureDataEndpoint: "children": None, "id": "asset:asset2", "is_mapped": None, - "label": "asset:asset2", + "label": "asset2", "tooltip": None, "setup_teardown_type": None, "type": "asset", @@ -287,7 +288,7 @@ class TestStructureDataEndpoint: "children": None, "id": "asset-alias:example-alias", "is_mapped": None, - "label": "asset-alias:example-alias", + "label": "example-alias", "tooltip": None, "setup_teardown_type": None, "type": "asset-alias", @@ -297,7 +298,7 @@ class TestStructureDataEndpoint: "children": None, "id": "asset:s3://dataset-bucket/example.csv", "is_mapped": None, - "label": "asset:s3://dataset-bucket/example.csv", + "label": "s3://dataset-bucket/example.csv", "tooltip": None, "setup_teardown_type": None, "type": "asset", @@ -307,7 +308,7 @@ class TestStructureDataEndpoint: "children": None, "id": "sensor:test_dag_id:test_dag_id:external_task_sensor", "is_mapped": None, - "label": "sensor:test_dag_id:test_dag_id:external_task_sensor", + "label": "external_task_sensor", "tooltip": None, "setup_teardown_type": None, "type": "sensor", @@ -317,6 +318,42 @@ class TestStructureDataEndpoint: "arrange": "LR", }, ), + ( + {"dag_id": DAG_ID_EXTERNAL_TRIGGER, "external_dependencies": True}, + { + "edges": [ + { + "is_setup_teardown": None, + "label": None, + "source_id": "trigger_dag_run_operator", + "target_id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + } + ], + "nodes": [ + { + "children": None, + "id": "trigger_dag_run_operator", + "is_mapped": None, + "label": "trigger_dag_run_operator", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "TriggerDagRunOperator", + }, + { + "children": None, + "id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + "is_mapped": None, + "label": "trigger_dag_run_operator", + "tooltip": None, + "setup_teardown_type": None, + "type": "trigger", + "operator": None, + }, + ], + "arrange": "LR", + }, + ), ], ) @pytest.mark.usefixtures("make_dag")