diff --git a/airflow/api_fastapi/core_api/datamodels/ui/structure.py b/airflow/api_fastapi/core_api/datamodels/ui/structure.py index cb135c141bb36..40c7b9dc0774f 100644 --- a/airflow/api_fastapi/core_api/datamodels/ui/structure.py +++ b/airflow/api_fastapi/core_api/datamodels/ui/structure.py @@ -39,7 +39,7 @@ class NodeResponse(BaseModel): label: str tooltip: str | None = None setup_teardown_type: Literal["setup", "teardown"] | None = None - type: Literal["join", "task", "asset_condition"] + type: Literal["join", "task", "asset-condition", "asset", "asset-alias", "dag", "sensor", "trigger"] operator: str | None = None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 0bfe7f6fe631b..3139f12b6b4ad 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -240,6 +240,13 @@ paths: type: boolean default: false title: Include Downstream + - name: external_dependencies + in: query + required: false + schema: + type: boolean + default: false + title: External Dependencies responses: '200': description: Successful Response @@ -8043,7 +8050,12 @@ components: enum: - join - task - - asset_condition + - asset-condition + - asset + - asset-alias + - dag + - sensor + - trigger title: Type operator: anyOf: diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow/api_fastapi/core_api/routes/ui/structure.py index de142776ba972..c3b914508d42f 100644 --- a/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -22,6 +22,7 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.models.serialized_dag import SerializedDagModel from airflow.utils.dag_edges import dag_edges from airflow.utils.task_group import task_group_to_dict @@ -30,7 +31,6 @@ @structure_router.get( "/structure_data", - include_in_schema=False, responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def structure_data( @@ -40,6 +40,7 @@ def structure_data( root: str | None = None, include_upstream: bool = False, include_downstream: bool = False, + external_dependencies: bool = False, ) -> StructureDataResponse: """Get Structure Data.""" dag = request.app.state.dag_bag.get_dag(dag_id) @@ -52,9 +53,7 @@ def structure_data( task_ids_or_regex=root, include_upstream=include_upstream, include_downstream=include_downstream ) - nodes = [ - task_group_to_dict(child) for child in sorted(dag.task_group.children.values(), key=lambda t: t.label) - ] + nodes = [task_group_to_dict(child) for child in dag.task_group.topological_sort()] edges = dag_edges(dag) data = { @@ -63,4 +62,40 @@ def structure_data( "edges": edges, } + if external_dependencies: + entry_node_ref = nodes[0] if nodes else None + exit_node_ref = nodes[-1] if nodes else None + + start_edges: list[dict] = [] + end_edges: list[dict] = [] + + for dependency_dag_id, dependencies in SerializedDagModel.get_dag_dependencies().items(): + for dependency in dependencies: + if dependency_dag_id != dag_id and dependency.target != dag_id: + continue + + # Add nodes + nodes.append( + { + "id": dependency.node_id, + "label": dependency.dependency_id, + "type": dependency.dependency_type, + } + ) + + # Add edges + # start dependency + if ( + dependency.source == dependency.dependency_type or dependency.target == dag_id + ) and entry_node_ref: + start_edges.append({"source_id": dependency.node_id, "target_id": entry_node_ref["id"]}) + + # end dependency + elif ( + dependency.target == dependency.dependency_type or dependency.source == dag_id + ) and exit_node_ref: + end_edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id}) + + data["edges"] = start_edges + edges + end_edges + return StructureDataResponse(**data) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index be897ab52a52c..d747a10893b50 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -387,11 +387,13 @@ export const useStructureServiceStructureDataKey = export const UseStructureServiceStructureDataKeyFn = ( { dagId, + externalDependencies, includeDownstream, includeUpstream, root, }: { dagId: string; + externalDependencies?: boolean; includeDownstream?: boolean; includeUpstream?: boolean; root?: string; @@ -399,7 +401,9 @@ export const UseStructureServiceStructureDataKeyFn = ( queryKey?: Array, ) => [ useStructureServiceStructureDataKey, - ...(queryKey ?? [{ dagId, includeDownstream, includeUpstream, root }]), + ...(queryKey ?? [ + { dagId, externalDependencies, includeDownstream, includeUpstream, root }, + ]), ]; export type BackfillServiceListBackfillsDefaultResponse = Awaited< ReturnType diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 8b80ae7c08df6..8f455a2409c8c 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -489,6 +489,7 @@ export const prefetchUseDashboardServiceHistoricalMetrics = ( * @param data.root * @param data.includeUpstream * @param data.includeDownstream + * @param data.externalDependencies * @returns StructureDataResponse Successful Response * @throws ApiError */ @@ -496,11 +497,13 @@ export const prefetchUseStructureServiceStructureData = ( queryClient: QueryClient, { dagId, + externalDependencies, includeDownstream, includeUpstream, root, }: { dagId: string; + externalDependencies?: boolean; includeDownstream?: boolean; includeUpstream?: boolean; root?: string; @@ -509,6 +512,7 @@ export const prefetchUseStructureServiceStructureData = ( queryClient.prefetchQuery({ queryKey: Common.UseStructureServiceStructureDataKeyFn({ dagId, + externalDependencies, includeDownstream, includeUpstream, root, @@ -516,6 +520,7 @@ export const prefetchUseStructureServiceStructureData = ( queryFn: () => StructureService.structureData({ dagId, + externalDependencies, includeDownstream, includeUpstream, root, diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 83404e4bb3494..60bcb9de9c390 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -613,6 +613,7 @@ export const useDashboardServiceHistoricalMetrics = < * @param data.root * @param data.includeUpstream * @param data.includeDownstream + * @param data.externalDependencies * @returns StructureDataResponse Successful Response * @throws ApiError */ @@ -623,11 +624,13 @@ export const useStructureServiceStructureData = < >( { dagId, + externalDependencies, includeDownstream, includeUpstream, root, }: { dagId: string; + externalDependencies?: boolean; includeDownstream?: boolean; includeUpstream?: boolean; root?: string; @@ -637,12 +640,13 @@ export const useStructureServiceStructureData = < ) => useQuery({ queryKey: Common.UseStructureServiceStructureDataKeyFn( - { dagId, includeDownstream, includeUpstream, root }, + { dagId, externalDependencies, includeDownstream, includeUpstream, root }, queryKey, ), queryFn: () => StructureService.structureData({ dagId, + externalDependencies, includeDownstream, includeUpstream, root, diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 4508a23b5e7b3..f29fb594e2990 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -588,6 +588,7 @@ export const useDashboardServiceHistoricalMetricsSuspense = < * @param data.root * @param data.includeUpstream * @param data.includeDownstream + * @param data.externalDependencies * @returns StructureDataResponse Successful Response * @throws ApiError */ @@ -598,11 +599,13 @@ export const useStructureServiceStructureDataSuspense = < >( { dagId, + externalDependencies, includeDownstream, includeUpstream, root, }: { dagId: string; + externalDependencies?: boolean; includeDownstream?: boolean; includeUpstream?: boolean; root?: string; @@ -612,12 +615,13 @@ export const useStructureServiceStructureDataSuspense = < ) => useSuspenseQuery({ queryKey: Common.UseStructureServiceStructureDataKeyFn( - { dagId, includeDownstream, includeUpstream, root }, + { dagId, externalDependencies, includeDownstream, includeUpstream, root }, queryKey, ), queryFn: () => StructureService.structureData({ dagId, + externalDependencies, includeDownstream, includeUpstream, root, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 965355b0c1937..79991c087fb24 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3186,7 +3186,16 @@ export const $NodeResponse = { }, type: { type: "string", - enum: ["join", "task", "asset_condition"], + enum: [ + "join", + "task", + "asset-condition", + "asset", + "asset-alias", + "dag", + "sensor", + "trigger", + ], title: "Type", }, operator: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 98ff5677360f5..be4dd239cbe3e 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -742,6 +742,7 @@ export class StructureService { * @param data.root * @param data.includeUpstream * @param data.includeDownstream + * @param data.externalDependencies * @returns StructureDataResponse Successful Response * @throws ApiError */ @@ -756,6 +757,7 @@ export class StructureService { root: data.root, include_upstream: data.includeUpstream, include_downstream: data.includeDownstream, + external_dependencies: data.externalDependencies, }, errors: { 404: "Not Found", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 42196f1a89202..937aacc1239f4 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -773,11 +773,27 @@ export type NodeResponse = { label: string; tooltip?: string | null; setup_teardown_type?: "setup" | "teardown" | null; - type: "join" | "task" | "asset_condition"; + type: + | "join" + | "task" + | "asset-condition" + | "asset" + | "asset-alias" + | "dag" + | "sensor" + | "trigger"; operator?: string | null; }; -export type type = "join" | "task" | "asset_condition"; +export type type = + | "join" + | "task" + | "asset-condition" + | "asset" + | "asset-alias" + | "dag" + | "sensor" + | "trigger"; /** * Request body for Clear Task Instances endpoint. @@ -1443,6 +1459,7 @@ export type HistoricalMetricsResponse = HistoricalMetricDataResponse; export type StructureDataData = { dagId: string; + externalDependencies?: boolean; includeDownstream?: boolean; includeUpstream?: boolean; root?: string | null; diff --git a/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts b/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts index 2cd9bae37b4a0..819d03f77d449 100644 --- a/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts +++ b/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts @@ -211,7 +211,7 @@ const generateElkGraph = ({ if (node.type === "join") { width = 10; height = 10; - } else if (node.type === "asset_condition") { + } else if (node.type === "asset-condition") { width = 30; height = 30; } diff --git a/airflow/www/views.py b/airflow/www/views.py index b12092ffe3f1e..fa60f732e725e 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3228,6 +3228,7 @@ def extra_links(self, *, session: Session = NEW_SESSION): else: return {"url": None, "error": f"No URL found for {link_name}"}, 404 + @mark_fastapi_migration_done @expose("/object/graph_data") @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE) @gzipped diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py index e270a49c6e813..0596b0f43e0fe 100644 --- a/tests/api_fastapi/core_api/routes/ui/test_structure.py +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -22,12 +22,16 @@ from airflow.models import DagBag from airflow.operators.empty import EmptyOperator +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.standard.sensors.external_task import ExternalTaskSensor +from airflow.sdk.definitions.asset import Asset, AssetAlias, Dataset from tests_common.test_utils.db import clear_db_runs pytestmark = pytest.mark.db_test DAG_ID = "test_dag_id" +DAG_ID_EXTERNAL_TRIGGER = "external_trigger" @pytest.fixture(autouse=True, scope="module") @@ -46,13 +50,32 @@ def clean(): @pytest.fixture def make_dag(dag_maker, session, time_machine): + with dag_maker( + dag_id=DAG_ID_EXTERNAL_TRIGGER, + serialized=True, + session=session, + start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), + ): + TriggerDagRunOperator(task_id="trigger_dag_run_operator", trigger_dag_id=DAG_ID) + + dag_maker.dagbag.sync_to_db() + with dag_maker( dag_id=DAG_ID, serialized=True, session=session, start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), + schedule=( + Asset(uri="s3://bucket/next-run-asset/1", name="asset1") + & Asset(uri="s3://bucket/next-run-asset/2", name="asset2") + & AssetAlias("example-alias") + ), ): - EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2") + ( + EmptyOperator(task_id="task_1", outlets=[Dataset(uri="s3://dataset-bucket/example.csv")]) + >> ExternalTaskSensor(task_id="external_task_sensor", external_dag_id=DAG_ID) + >> EmptyOperator(task_id="task_2") + ) dag_maker.dagbag.sync_to_db() @@ -64,14 +87,19 @@ class TestStructureDataEndpoint: ( {"dag_id": DAG_ID}, { - "arrange": "LR", "edges": [ { "is_setup_teardown": None, "label": None, - "source_id": "task_1", + "source_id": "external_task_sensor", "target_id": "task_2", }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_1", + "target_id": "external_task_sensor", + }, ], "nodes": [ { @@ -79,22 +107,33 @@ class TestStructureDataEndpoint: "id": "task_1", "is_mapped": None, "label": "task_1", - "operator": "EmptyOperator", + "tooltip": None, "setup_teardown_type": None, + "type": "task", + "operator": "EmptyOperator", + }, + { + "children": None, + "id": "external_task_sensor", + "is_mapped": None, + "label": "external_task_sensor", "tooltip": None, + "setup_teardown_type": None, "type": "task", + "operator": "ExternalTaskSensor", }, { "children": None, "id": "task_2", "is_mapped": None, "label": "task_2", - "operator": "EmptyOperator", - "setup_teardown_type": None, "tooltip": None, + "setup_teardown_type": None, "type": "task", + "operator": "EmptyOperator", }, ], + "arrange": "LR", }, ), ( @@ -128,6 +167,193 @@ class TestStructureDataEndpoint: ], }, ), + ( + { + "dag_id": DAG_ID, + "external_dependencies": True, + }, + { + "edges": [ + { + "is_setup_teardown": None, + "label": None, + "source_id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + "target_id": "task_1", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "asset:asset1", + "target_id": "task_1", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "asset:asset2", + "target_id": "task_1", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "asset-alias:example-alias", + "target_id": "task_1", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "sensor:test_dag_id:test_dag_id:external_task_sensor", + "target_id": "task_1", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "external_task_sensor", + "target_id": "task_2", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_1", + "target_id": "external_task_sensor", + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_2", + "target_id": "asset:s3://dataset-bucket/example.csv", + }, + ], + "nodes": [ + { + "children": None, + "id": "task_1", + "is_mapped": None, + "label": "task_1", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "EmptyOperator", + }, + { + "children": None, + "id": "external_task_sensor", + "is_mapped": None, + "label": "external_task_sensor", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "ExternalTaskSensor", + }, + { + "children": None, + "id": "task_2", + "is_mapped": None, + "label": "task_2", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "EmptyOperator", + }, + { + "children": None, + "id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + "is_mapped": None, + "label": "trigger_dag_run_operator", + "tooltip": None, + "setup_teardown_type": None, + "type": "trigger", + "operator": None, + }, + { + "children": None, + "id": "asset:asset1", + "is_mapped": None, + "label": "asset1", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset", + "operator": None, + }, + { + "children": None, + "id": "asset:asset2", + "is_mapped": None, + "label": "asset2", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset", + "operator": None, + }, + { + "children": None, + "id": "asset-alias:example-alias", + "is_mapped": None, + "label": "example-alias", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset-alias", + "operator": None, + }, + { + "children": None, + "id": "asset:s3://dataset-bucket/example.csv", + "is_mapped": None, + "label": "s3://dataset-bucket/example.csv", + "tooltip": None, + "setup_teardown_type": None, + "type": "asset", + "operator": None, + }, + { + "children": None, + "id": "sensor:test_dag_id:test_dag_id:external_task_sensor", + "is_mapped": None, + "label": "external_task_sensor", + "tooltip": None, + "setup_teardown_type": None, + "type": "sensor", + "operator": None, + }, + ], + "arrange": "LR", + }, + ), + ( + {"dag_id": DAG_ID_EXTERNAL_TRIGGER, "external_dependencies": True}, + { + "edges": [ + { + "is_setup_teardown": None, + "label": None, + "source_id": "trigger_dag_run_operator", + "target_id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + } + ], + "nodes": [ + { + "children": None, + "id": "trigger_dag_run_operator", + "is_mapped": None, + "label": "trigger_dag_run_operator", + "tooltip": None, + "setup_teardown_type": None, + "type": "task", + "operator": "TriggerDagRunOperator", + }, + { + "children": None, + "id": "trigger:external_trigger:test_dag_id:trigger_dag_run_operator", + "is_mapped": None, + "label": "trigger_dag_run_operator", + "tooltip": None, + "setup_teardown_type": None, + "type": "trigger", + "operator": None, + }, + ], + "arrange": "LR", + }, + ), ], ) @pytest.mark.usefixtures("make_dag")